feat: Add update-from-fork feature and fix GDrive

This commit is contained in:
Cursor User
2025-06-18 01:28:42 +02:00
parent 12e01b3f44
commit c4c64d3222
5 changed files with 222 additions and 109 deletions

View File

@@ -317,6 +317,21 @@ async def inline_alive(ult):
await ult.answer(result)
@ultroid_cmd(pattern="setrepo( (.*)|$)")
async def set_repo(event):
"""
Sets the upstream repository for updates.
Usage: .setrepo <your_fork_url>
"""
repo_url = event.pattern_match.group(2)
if not repo_url:
return await eor(event, "Please provide your forked repository URL. Example: `.setrepo https://github.com/user/repo`")
if not repo_url.endswith(".git"):
repo_url += ".git"
udB.set_key("UPSTREAM_REPO", repo_url)
await eor(event, f"Upstream repository has been set to: `{repo_url}`")
@ultroid_cmd(pattern="update( (.*)|$)")
async def _(e):
xx = await e.eor(get_string("upd_1"))
@@ -331,7 +346,13 @@ async def _(e):
os.execl(sys.executable, "python3", "-m", "pyUltroid")
# return
m = await updater()
branch = (Repo.init()).active_branch
try:
repo = Repo()
branch = repo.active_branch
except (NoSuchPathError, InvalidGitRepositoryError):
repo = None
branch = "main"
if m:
x = await asst.send_file(
udB.get_key("LOG_CHANNEL"),
@@ -347,8 +368,14 @@ async def _(e):
link_preview=False,
)
else:
repo_url = udB.get_key("UPSTREAM_REPO")
if not repo_url and repo:
repo_url = repo.remotes[0].config_reader.get("url")
elif not repo_url:
repo_url = "https://github.com/TeamUltroid/Ultroid"
await xx.edit(
f'<code>Your BOT is </code><strong>up-to-date</strong><code> with </code><strong><a href="https://github.com/TeamUltroid/Ultroid/tree/{branch}">[{branch}]</a></strong>',
f'<code>Your BOT is </code><strong>up-to-date</strong><code> with </code><strong><a href="{repo_url.replace(".git", "")}/tree/{branch}">[{branch}]</a></strong>',
parse_mode="html",
link_preview=False,
)

View File

@@ -62,6 +62,10 @@ async def gdown(event):
)
async def files(event):
GDrive = GDriveManager()
if not GDrive.creds or not GDrive.creds.valid:
return await event.eor(
"G-Drive credentials are not valid. Please try setting them up again via your assistant bot using `/start set`."
)
if not os.path.exists(GDrive.token_file):
return await event.eor(get_string("gdrive_6").format(asst.me.username))
eve = await event.eor(get_string("com_1"))

View File

@@ -5,28 +5,28 @@
# PLease read the GNU Affero General Public License in
# <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>.
import json
import os
import time
from io import FileIO
from logging import WARNING
from mimetypes import guess_type
from typing import Any, Dict, Optional
from apiclient.http import LOGGER, MediaFileUpload, MediaIoBaseDownload
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build, logger
from httplib2 import Http
from oauth2client.client import OOB_CALLBACK_URN, OAuth2WebServerFlow
from oauth2client.client import logger as _logger
from oauth2client.file import Storage
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from .. import udB
from .helper import humanbytes, time_formatter
for log in [LOGGER, logger, _logger]:
log.setLevel(WARNING)
logger.setLevel(WARNING)
class GDriveManager:
def __init__(self):
self._flow = {}
self.gdrive_creds = {
"oauth_scope": [
"https://www.googleapis.com/auth/drive",
@@ -34,73 +34,118 @@ class GDriveManager:
"https://www.googleapis.com/auth/drive.metadata",
],
"dir_mimetype": "application/vnd.google-apps.folder",
"redirect_uri": OOB_CALLBACK_URN,
}
client_id = (
udB.get_key("GDRIVE_CLIENT_ID")
or "458306970678-jhfbv6o5sf1ar63o1ohp4c0grblp8qba.apps.googleusercontent.com"
)
client_secret = (
udB.get_key("GDRIVE_CLIENT_SECRET")
or "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"
)
self.client_secrets = {
"installed": {
"client_id": client_id,
"client_secret": client_secret,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": ["http://localhost:8080/"],
}
}
self.token_file = "resources/auth/gdrive_creds.json"
self.auth_token = udB.get_key("GDRIVE_AUTH_TOKEN")
self.folder_id = udB.get_key("GDRIVE_FOLDER_ID")
self.token_file = "resources/auth/gdrive_creds.json"
self.creds: Optional[Credentials] = None
if self.auth_token:
try:
self.creds = Credentials.from_authorized_user_info(
json.loads(self.auth_token), self.gdrive_creds["oauth_scope"]
)
except (json.JSONDecodeError, ValueError):
if os.path.exists(self.token_file):
os.remove(self.token_file)
self.auth_token = None
udB.del_key("GDRIVE_AUTH_TOKEN")
if not self.creds or not self.creds.valid:
if self.creds and self.creds.expired and self.creds.refresh_token:
try:
self.creds.refresh(Request())
if self.creds:
udB.set_key("GDRIVE_AUTH_TOKEN", self.creds.to_json())
with open(self.token_file, "w") as token:
token.write(self.creds.to_json())
except Exception:
self.creds = None
if os.path.exists(self.token_file):
os.remove(self.token_file)
udB.del_key("GDRIVE_AUTH_TOKEN")
@staticmethod
def _create_download_link(fileId: str):
def _create_download_link(fileId: str) -> str:
return f"https://drive.google.com/uc?id={fileId}&export=download"
@staticmethod
def _create_folder_link(folderId: str):
def _create_folder_link(folderId: str) -> str:
return f"https://drive.google.com/folderview?id={folderId}"
def _create_token_file(self, code: str = None):
if code and self._flow:
_auth_flow = self._flow["_"]
credentials = _auth_flow.step2_exchange(code)
Storage(self.token_file).put(credentials)
return udB.set_key("GDRIVE_AUTH_TOKEN", str(open(self.token_file).read()))
def _create_token_file(self, code: Optional[str] = None) -> Any:
flow = InstalledAppFlow.from_client_config(
self.client_secrets,
self.gdrive_creds["oauth_scope"],
redirect_uri="http://localhost:8080/",
)
if not code:
auth_url, _ = flow.authorization_url(prompt="consent")
return auth_url
try:
_auth_flow = OAuth2WebServerFlow(
udB.get_key("GDRIVE_CLIENT_ID")
or "458306970678-jhfbv6o5sf1ar63o1ohp4c0grblp8qba.apps.googleusercontent.com",
udB.get_key("GDRIVE_CLIENT_SECRET")
or "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",
self.gdrive_creds["oauth_scope"],
redirect_uri=self.gdrive_creds["redirect_uri"],
)
self._flow["_"] = _auth_flow
except KeyError:
return "Fill GDRIVE client credentials"
return _auth_flow.step1_get_authorize_url()
flow.fetch_token(code=code)
self.creds = flow.credentials
if self.creds:
with open(self.token_file, "w") as token:
token.write(self.creds.to_json())
udB.set_key("GDRIVE_AUTH_TOKEN", self.creds.to_json())
return True
except Exception as e:
return str(e)
@property
def _http(self):
storage = Storage(self.token_file)
creds = storage.get()
http = Http()
http.redirect_codes = http.redirect_codes - {308}
creds.refresh(http)
return creds.authorize(http)
def _build(self) -> Any:
if not self.creds or not self.creds.valid:
return None
return build("drive", "v2", credentials=self.creds, cache_discovery=False)
@property
def _build(self):
return build("drive", "v2", http=self._http, cache_discovery=False)
def _set_permissions(self, fileId: str):
def _set_permissions(self, fileId: str) -> None:
_permissions = {
"role": "reader",
"type": "anyone",
"value": None,
"withLink": True,
}
self._build.permissions().insert(
fileId=fileId, body=_permissions, supportsAllDrives=True
).execute(http=self._http)
service = self._build
if service:
service.permissions().insert(
fileId=fileId, body=_permissions, supportsAllDrives=True
).execute()
async def _upload_file(
self, event, path: str, filename: str = None, folder_id: str = None
):
self,
event: Any,
path: str,
filename: Optional[str] = None,
folder_id: Optional[str] = None,
) -> Any:
service = self._build
if not service:
raise Exception("Google Drive not authenticated.")
last_txt = ""
if not filename:
filename = path.split("/")[-1]
mime_type = guess_type(path)[0] or "application/octet-stream"
media_body = MediaFileUpload(path, mimetype=mime_type, resumable=True)
body = {
body: Dict[str, Any] = {
"title": filename,
"description": "Uploaded using Ultroid Userbot",
"mimeType": mime_type,
@@ -109,7 +154,7 @@ class GDriveManager:
body["parents"] = [{"id": folder_id}]
elif self.folder_id:
body["parents"] = [{"id": self.folder_id}]
upload = self._build.files().insert(
upload = service.files().insert(
body=body, media_body=media_body, supportsAllDrives=True
)
start = time.time()
@@ -121,13 +166,15 @@ class GDriveManager:
completed = _progress.resumable_progress
total_size = _progress.total_size
percentage = round((completed / total_size) * 100, 2)
speed = round(completed / diff, 2)
eta = round((total_size - completed) / speed, 2) * 1000
speed = round(completed / diff, 2) if diff > 0 else 0
eta = (
round((total_size - completed) / speed, 2) * 1000 if speed > 0 else 0
)
crnt_txt = (
f"`Uploading {filename} to GDrive...\n\n"
+ f"Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n"
+ f"Speed: {humanbytes(speed)}/s\n"
+ f"ETA: {time_formatter(eta)}`"
f"`Uploading {filename} to GDrive...\n\n`"
f"`Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n`"
f"`Speed: {humanbytes(speed)}/s\n`"
f"`ETA: {time_formatter(eta)}`"
)
if round((diff % 10.00) == 0) or last_txt != crnt_txt:
await event.edit(crnt_txt)
@@ -137,28 +184,34 @@ class GDriveManager:
self._set_permissions(fileId=fileId)
except BaseException:
pass
_url = self._build.files().get(fileId=fileId, supportsAllDrives=True).execute()
_url = service.files().get(fileId=fileId, supportsAllDrives=True).execute()
return _url.get("webContentLink")
async def _download_file(self, event, fileId: str, filename: str = None):
async def _download_file(
self, event: Any, fileId: str, filename: Optional[str] = None
) -> Any:
service = self._build
if not service:
return False, "Google Drive not authenticated."
last_txt = ""
if fileId.startswith("http"):
if "=download" in fileId:
fileId = fileId.split("=")[1][:-7]
fileId = fileId.split("=")[1].split("&")[0]
elif "/view" in fileId:
fileId = fileId.split("/")[::-1][1]
fileId = fileId.split("/")[-2]
try:
file_metadata = (
service.files().get(fileId=fileId, supportsAllDrives=True).execute()
)
if not filename:
filename = (
self._build.files()
.get(fileId=fileId, supportsAllDrives=True)
.execute()["title"]
)
downloader = self._build.files().get_media(
filename = file_metadata["title"]
downloader = service.files().get_media(
fileId=fileId, supportsAllDrives=True
)
except Exception as ex:
return False, str(ex)
if not filename:
return False, "Could not determine filename."
with FileIO(filename, "wb") as file:
start = time.time()
download = MediaIoBaseDownload(file, downloader)
@@ -170,13 +223,17 @@ class GDriveManager:
completed = _progress.resumable_progress
total_size = _progress.total_size
percentage = round((completed / total_size) * 100, 2)
speed = round(completed / diff, 2)
eta = round((total_size - completed) / speed, 2) * 1000
speed = round(completed / diff, 2) if diff > 0 else 0
eta = (
round((total_size - completed) / speed, 2) * 1000
if speed > 0
else 0
)
crnt_txt = (
f"`Downloading {filename} from GDrive...\n\n"
+ f"Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n"
+ f"Speed: {humanbytes(speed)}/s\n"
+ f"ETA: {time_formatter(eta)}`"
f"`Downloading {filename} from GDrive...\n\n`"
f"`Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n`"
f"`Speed: {humanbytes(speed)}/s\n`"
f"`ETA: {time_formatter(eta)}`"
)
if round((diff % 10.00) == 0) or last_txt != crnt_txt:
await event.edit(crnt_txt)
@@ -184,9 +241,12 @@ class GDriveManager:
return True, filename
@property
def _list_files(self):
def _list_files(self) -> Dict[str, str]:
service = self._build
if not service:
return {}
_items = (
self._build.files()
service.files()
.list(
supportsTeamDrives=True,
includeTeamDriveItems=True,
@@ -197,31 +257,37 @@ class GDriveManager:
.execute()
)
_files = {}
for files in _items["items"]:
for files in _items.get("items", []):
if files["mimeType"] == self.gdrive_creds["dir_mimetype"]:
_files[self._create_folder_link(files["id"])] = files["title"]
else:
_files[self._create_download_link(files["id"])] = files["title"]
return _files
def create_directory(self, directory):
body = {
def create_directory(self, directory: str) -> Any:
service = self._build
if not service:
return None
body: Dict[str, Any] = {
"title": directory,
"mimeType": self.gdrive_creds["dir_mimetype"],
}
if self.folder_id:
body["parents"] = [{"id": self.folder_id}]
file = self._build.files().insert(body=body, supportsAllDrives=True).execute()
file = service.files().insert(body=body, supportsAllDrives=True).execute()
fileId = file.get("id")
self._set_permissions(fileId=fileId)
return fileId
def search(self, title):
def search(self, title: str) -> Dict[str, str]:
service = self._build
if not service:
return {}
query = f"title contains '{title}'"
if self.folder_id:
query = f"'{self.folder_id}' in parents and (title contains '{title}')"
_items = (
self._build.files()
service.files()
.list(
supportsTeamDrives=True,
includeTeamDriveItems=True,
@@ -233,6 +299,6 @@ class GDriveManager:
.execute()
)
_files = {}
for files in _items["items"]:
for files in _items.get("items", []):
_files[self._create_download_link(files["id"])] = files["title"]
return _files

View File

@@ -18,6 +18,7 @@ from urllib.request import urlretrieve
from .. import run_as_module
if run_as_module:
from .. import udB
from ..configs import Var
@@ -185,9 +186,7 @@ if run_as_module:
await eod(ok, f"✓ `Ultroid - Installed`: `{plug}` ✓")
async def heroku_logs(event):
"""
post heroku logs
"""
from .. import LOGS
xx = await eor(event, "`Processing...`")
@@ -233,8 +232,8 @@ if run_as_module:
def gen_chlog(repo, diff):
"""Generate Changelogs..."""
UPSTREAM_REPO_URL = (
Repo().remotes[0].config_reader.get("url").replace(".git", "")
)
udB.get_key("UPSTREAM_REPO") or repo.remotes[0].config_reader.get("url")
).replace(".git", "")
ac_br = repo.active_branch.name
ch_log = tldr_log = ""
ch = f"<b>Ultroid {ultroid_version} updates for <a href={UPSTREAM_REPO_URL}/tree/{ac_br}>[{ac_br}]</a>:</b>"
@@ -263,7 +262,7 @@ async def bash(cmd, run_code=0):
err = stderr.decode().strip() or None
out = stdout.decode().strip()
if not run_code and err:
if match := re.match("\/bin\/sh: (.*): ?(\w+): not found", err):
if match := re.match(r"/bin/sh: (.*): ?(\w+): not found", err):
return out, f"{match.group(2).upper()}_NOT_FOUND"
return out, err
@@ -275,32 +274,45 @@ async def bash(cmd, run_code=0):
async def updater():
from .. import LOGS
try:
off_repo = Repo().remotes[0].config_reader.get("url").replace(".git", "")
except Exception as er:
LOGS.exception(er)
if not Repo:
LOGS.info("Git is not installed.")
return
try:
repo = Repo()
except NoSuchPathError as error:
LOGS.info(f"`directory {error} is not found`")
Repo().__del__()
return
except GitCommandError as error:
LOGS.info(f"`Early failure! {error}`")
Repo().__del__()
return
except InvalidGitRepositoryError:
repo = Repo.init()
origin = repo.create_remote("upstream", off_repo)
origin.fetch()
repo.create_head("main", origin.refs.main)
repo.heads.main.set_tracking_branch(origin.refs.main)
repo.heads.main.checkout(True)
except (NoSuchPathError, GitCommandError, InvalidGitRepositoryError) as e:
LOGS.info(f"Could not initialize git repo: {e}")
if isinstance(e, InvalidGitRepositoryError):
repo = Repo.init()
off_repo = (
udB.get_key("UPSTREAM_REPO") or "https://github.com/ThePrateekBhatia/Ultroid"
)
if "upstream" not in repo.remotes:
origin = repo.create_remote("upstream", off_repo)
origin.fetch()
repo.create_head("main", origin.refs.main)
repo.heads.main.set_tracking_branch(origin.refs.main)
repo.heads.main.checkout(True)
else:
return
ac_br = repo.active_branch.name
repo.create_remote("upstream", off_repo) if "upstream" not in repo.remotes else None
off_repo = udB.get_key("UPSTREAM_REPO") or repo.remotes[0].config_reader.get("url")
if "upstream" not in repo.remotes:
repo.create_remote("upstream", off_repo)
else:
repo.remote("upstream").set_url(off_repo)
ups_rem = repo.remote("upstream")
ups_rem.fetch(ac_br)
try:
ups_rem.fetch(ac_br)
except GitCommandError as e:
LOGS.info(f"Failed to fetch from upstream remote: {e}")
return False
changelog, tl_chnglog = await gen_chlog(repo, f"HEAD..upstream/{ac_br}")
return bool(changelog)

View File

@@ -1,3 +1,7 @@
pip-requirements
google-api-python-client
google-auth-httplib2
google-auth-oauthlib
# Important Requirements here.
telethon
gitpython