Date: 30-8-2022 Co-Authored-By: Aditya <me@xditya.me> Co-Authored-By: Amit Sharma <48654350+buddhhu@users.noreply.github.com> Co-Authored-By: CyrusXD <79554993+Ashutosh1478@users.noreply.github.com> Co-Authored-By: Danish <danish@ultroid.tech> Co-Authored-By: TechiError <error@notavailable.live> Co-Authored-By: Arnab Paryali <arnabxd@pm.me>
1160 lines
42 KiB
Python
1160 lines
42 KiB
Python
#!/usr/bin/env python
|
|
# In[ ]:
|
|
# coding: utf-8
|
|
|
|
###### Searching and Downloading Google Images to the local disk ######
|
|
|
|
|
|
import codecs
|
|
import datetime
|
|
import http.client
|
|
import json
|
|
import os
|
|
import re
|
|
import ssl
|
|
import sys
|
|
import time # Importing the time library to check the time of code execution
|
|
import urllib.request
|
|
from http.client import BadStatusLine
|
|
from urllib.parse import quote
|
|
from urllib.request import HTTPError, Request, URLError, urlopen
|
|
|
|
# Import Libraries
|
|
from .. import LOGS
|
|
from .tools import async_searcher
|
|
|
|
http.client._MAXHEADERS = 1000
|
|
|
|
args_list = [
|
|
"keywords",
|
|
"keywords_from_file",
|
|
"prefix_keywords",
|
|
"suffix_keywords",
|
|
"limit",
|
|
"format",
|
|
"color",
|
|
"color_type",
|
|
"usage_rights",
|
|
"size",
|
|
"exact_size",
|
|
"aspect_ratio",
|
|
"type",
|
|
"time",
|
|
"time_range",
|
|
"delay",
|
|
"url",
|
|
"single_image",
|
|
"output_directory",
|
|
"image_directory",
|
|
"no_directory",
|
|
"proxy",
|
|
"similar_images",
|
|
"specific_site",
|
|
"metadata",
|
|
"extract_metadata",
|
|
"socket_timeout",
|
|
"thumbnail",
|
|
"thumbnail_only",
|
|
"language",
|
|
"prefix",
|
|
"chromedriver",
|
|
"related_images",
|
|
"safe_search",
|
|
"no_numbering",
|
|
"offset",
|
|
"no_download",
|
|
"save_source",
|
|
"ignore_urls",
|
|
]
|
|
|
|
|
|
class googleimagesdownload:
|
|
def __init__(self):
|
|
pass
|
|
|
|
# Downloading entire Web Document (Raw Page Content)
|
|
async def download_page(self, url):
|
|
try:
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.104 Safari/537.36"
|
|
}
|
|
|
|
# req = urllib.request.Request(url, headers=headers)
|
|
# resp = urllib.request.urlopen(req)
|
|
# return str(resp.read())
|
|
resp = await async_searcher(url, re_content=True, headers=headers)
|
|
return str(resp)
|
|
except Exception as er:
|
|
LOGS.exception(
|
|
"Could not open URL. Please check your internet connection and/or ssl settings \n"
|
|
"If you are using proxy, make sure your proxy settings is configured correctly"
|
|
)
|
|
raise er
|
|
|
|
# Download Page for more than 100 images
|
|
|
|
def download_extended_page(self, url, chromedriver):
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.keys import Keys
|
|
|
|
options = webdriver.ChromeOptions()
|
|
options.add_argument("--no-sandbox")
|
|
options.add_argument("--headless")
|
|
|
|
try:
|
|
browser = webdriver.Chrome(chromedriver, chrome_options=options)
|
|
except Exception as e:
|
|
LOGS.info(
|
|
"Looks like we cannot locate the path the 'chromedriver' (use the '--chromedriver' "
|
|
"argument to specify the path to the executable.) or google chrome browser is not "
|
|
"installed on your machine (exception: %s)" % e
|
|
)
|
|
sys.exit()
|
|
browser.set_window_size(1024, 768)
|
|
|
|
# Open the link
|
|
browser.get(url)
|
|
time.sleep(1)
|
|
|
|
element = browser.find_element_by_tag_name("body")
|
|
# Scroll down
|
|
for i in range(30):
|
|
element.send_keys(Keys.PAGE_DOWN)
|
|
time.sleep(0.3)
|
|
|
|
try:
|
|
browser.find_element_by_id("smb").click()
|
|
for _ in range(50):
|
|
element.send_keys(Keys.PAGE_DOWN)
|
|
time.sleep(0.3) # bot id protection
|
|
except BaseException:
|
|
for _ in range(10):
|
|
element.send_keys(Keys.PAGE_DOWN)
|
|
time.sleep(0.3) # bot id protection
|
|
|
|
time.sleep(0.5)
|
|
|
|
source = browser.page_source # page source
|
|
# close the browser
|
|
browser.close()
|
|
|
|
return source
|
|
|
|
# Correcting the escape characters for python2
|
|
|
|
def replace_with_byte(self, match):
|
|
return chr(int(match.group(0)[1:], 8))
|
|
|
|
def repair(self, brokenjson):
|
|
# up to 3 digits for byte values up to FF
|
|
invalid_escape = re.compile(r"\\[0-7]{1,3}")
|
|
return invalid_escape.sub(self.replace_with_byte, brokenjson)
|
|
|
|
# Finding 'Next Image' from the given raw page
|
|
|
|
def get_next_tab(self, s):
|
|
start_line = s.find('class="dtviD"')
|
|
if start_line == -1: # If no links are found then give an error!
|
|
end_quote = 0
|
|
link = "no_tabs"
|
|
return link, "", end_quote
|
|
start_line = s.find('class="dtviD"')
|
|
start_content = s.find('href="', start_line + 1)
|
|
end_content = s.find('">', start_content + 1)
|
|
url_item = "https://www.google.com" + str(s[start_content + 6 : end_content])
|
|
url_item = url_item.replace("&", "&")
|
|
start_line_2 = s.find('class="dtviD"')
|
|
s = s.replace("&", "&")
|
|
start_content_2 = s.find(":", start_line_2 + 1)
|
|
end_content_2 = s.find("&usg=", start_content_2 + 1)
|
|
url_item_name = str(s[start_content_2 + 1 : end_content_2])
|
|
chars = url_item_name.find(",g_1:")
|
|
chars_end = url_item_name.find(":", chars + 6)
|
|
if chars_end == -1:
|
|
updated_item_name = (url_item_name[chars + 5 :]).replace("+", " ")
|
|
else:
|
|
updated_item_name = (url_item_name[chars + 5 : chars_end]).replace("+", " ")
|
|
return url_item, updated_item_name, end_content
|
|
|
|
# Getting all links with the help of '_images_get_next_image'
|
|
|
|
def get_all_tabs(self, page):
|
|
tabs = {}
|
|
while True:
|
|
item, item_name, end_content = self.get_next_tab(page)
|
|
if item == "no_tabs":
|
|
break
|
|
if len(item_name) > 100 or item_name == "background-color":
|
|
break
|
|
# Append all the links in the list named 'Links'
|
|
tabs[item_name] = item
|
|
# Timer could be used to slow down the request for image
|
|
# downloads
|
|
time.sleep(0.1)
|
|
page = page[end_content:]
|
|
return tabs
|
|
|
|
# Format the object in readable format
|
|
|
|
def format_object(self, object):
|
|
data = object[1]
|
|
main = data[3]
|
|
info = data[9]
|
|
return {
|
|
"image_height": main[2],
|
|
"image_width": main[1],
|
|
"image_link": main[0],
|
|
"image_format": main[0][-1 * (len(main[0]) - main[0].rfind(".") - 1) :],
|
|
"image_description": info["2003"][3],
|
|
"image_source": info["2003"][2],
|
|
"image_thumbnail_url": data[2][0],
|
|
}
|
|
|
|
# function to download single image
|
|
|
|
def single_image(self, image_url):
|
|
main_directory = "downloads"
|
|
extensions = (".jpg", ".gif", ".png", ".bmp", ".svg", ".webp", ".ico")
|
|
url = image_url
|
|
try:
|
|
os.makedirs(main_directory)
|
|
except OSError as e:
|
|
if e.errno != 17:
|
|
raise
|
|
req = Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"
|
|
},
|
|
)
|
|
|
|
response = urlopen(req, None, 10)
|
|
data = response.read()
|
|
response.close()
|
|
|
|
image_name = str(url[(url.rfind("/")) + 1 :])
|
|
if "?" in image_name:
|
|
image_name = image_name[: image_name.find("?")]
|
|
# if ".jpg" in image_name or ".gif" in image_name or ".png" in
|
|
# image_name or ".bmp" in image_name or ".svg" in image_name or ".webp"
|
|
# in image_name or ".ico" in image_name:
|
|
if any(map(lambda extension: extension in image_name, extensions)):
|
|
file_name = main_directory + "/" + image_name
|
|
else:
|
|
file_name = main_directory + "/" + image_name + ".jpg"
|
|
image_name = image_name + ".jpg"
|
|
|
|
try:
|
|
with open(file_name, "wb") as output_file:
|
|
output_file.write(data)
|
|
except OSError as e:
|
|
raise e
|
|
|
|
def similar_images(self, similar_images):
|
|
try:
|
|
searchUrl = (
|
|
"https://www.google.com/searchbyimage?site=search&sa=X&image_url="
|
|
+ similar_images
|
|
)
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
|
|
}
|
|
|
|
req1 = urllib.request.Request(searchUrl, headers=headers)
|
|
resp1 = urllib.request.urlopen(req1)
|
|
content = str(resp1.read())
|
|
l1 = content.find("AMhZZ")
|
|
l2 = content.find("&", l1)
|
|
urll = content[l1:l2]
|
|
|
|
newurl = (
|
|
"https://www.google.com/search?tbs=sbi:" + urll + "&site=search&sa=X"
|
|
)
|
|
req2 = urllib.request.Request(newurl, headers=headers)
|
|
urllib.request.urlopen(req2)
|
|
l3 = content.find("/search?sa=X&q=")
|
|
l4 = content.find(";", l3 + 19)
|
|
return content[l3 + 19 : l4]
|
|
except BaseException:
|
|
return "Cloud not connect to Google Images endpoint"
|
|
|
|
# Building URL parameters
|
|
def build_url_parameters(self, arguments):
|
|
if arguments["language"]:
|
|
lang = "&lr="
|
|
lang_param = {
|
|
"Arabic": "lang_ar",
|
|
"Chinese (Simplified)": "lang_zh-CN",
|
|
"Chinese (Traditional)": "lang_zh-TW",
|
|
"Czech": "lang_cs",
|
|
"Danish": "lang_da",
|
|
"Dutch": "lang_nl",
|
|
"English": "lang_en",
|
|
"Estonian": "lang_et",
|
|
"Finnish": "lang_fi",
|
|
"French": "lang_fr",
|
|
"German": "lang_de",
|
|
"Greek": "lang_el",
|
|
"Hebrew": "lang_iw ",
|
|
"Hungarian": "lang_hu",
|
|
"Icelandic": "lang_is",
|
|
"Italian": "lang_it",
|
|
"Japanese": "lang_ja",
|
|
"Korean": "lang_ko",
|
|
"Latvian": "lang_lv",
|
|
"Lithuanian": "lang_lt",
|
|
"Norwegian": "lang_no",
|
|
"Portuguese": "lang_pt",
|
|
"Polish": "lang_pl",
|
|
"Romanian": "lang_ro",
|
|
"Russian": "lang_ru",
|
|
"Spanish": "lang_es",
|
|
"Swedish": "lang_sv",
|
|
"Turkish": "lang_tr",
|
|
}
|
|
lang_url = lang + lang_param[arguments["language"]]
|
|
else:
|
|
lang_url = ""
|
|
|
|
if arguments["time_range"]:
|
|
json_acceptable_string = arguments["time_range"].replace("'", '"')
|
|
d = json.loads(json_acceptable_string)
|
|
time_range = ",cdr:1,cd_min:" + d["time_min"] + ",cd_max:" + d["time_max"]
|
|
else:
|
|
time_range = ""
|
|
|
|
if arguments["exact_size"]:
|
|
size_array = [x.strip() for x in arguments["exact_size"].split(",")]
|
|
exact_size = (
|
|
",isz:ex,iszw:" + str(size_array[0]) + ",iszh:" + str(size_array[1])
|
|
)
|
|
else:
|
|
exact_size = ""
|
|
|
|
built_url = "&tbs="
|
|
counter = 0
|
|
params = {
|
|
"color": [
|
|
arguments["color"],
|
|
{
|
|
"red": "ic:specific,isc:red",
|
|
"orange": "ic:specific,isc:orange",
|
|
"yellow": "ic:specific,isc:yellow",
|
|
"green": "ic:specific,isc:green",
|
|
"teal": "ic:specific,isc:teel",
|
|
"blue": "ic:specific,isc:blue",
|
|
"purple": "ic:specific,isc:purple",
|
|
"pink": "ic:specific,isc:pink",
|
|
"white": "ic:specific,isc:white",
|
|
"gray": "ic:specific,isc:gray",
|
|
"black": "ic:specific,isc:black",
|
|
"brown": "ic:specific,isc:brown",
|
|
},
|
|
],
|
|
"color_type": [
|
|
arguments["color_type"],
|
|
{
|
|
"full-color": "ic:color",
|
|
"black-and-white": "ic:gray",
|
|
"transparent": "ic:trans",
|
|
},
|
|
],
|
|
"usage_rights": [
|
|
arguments["usage_rights"],
|
|
{
|
|
"labeled-for-reuse-with-modifications": "sur:fmc",
|
|
"labeled-for-reuse": "sur:fc",
|
|
"labeled-for-noncommercial-reuse-with-modification": "sur:fm",
|
|
"labeled-for-nocommercial-reuse": "sur:f",
|
|
},
|
|
],
|
|
"size": [
|
|
arguments["size"],
|
|
{
|
|
"large": "isz:l",
|
|
"medium": "isz:m",
|
|
"icon": "isz:i",
|
|
">400*300": "isz:lt,islt:qsvga",
|
|
">640*480": "isz:lt,islt:vga",
|
|
">800*600": "isz:lt,islt:svga",
|
|
">1024*768": "visz:lt,islt:xga",
|
|
">2MP": "isz:lt,islt:2mp",
|
|
">4MP": "isz:lt,islt:4mp",
|
|
">6MP": "isz:lt,islt:6mp",
|
|
">8MP": "isz:lt,islt:8mp",
|
|
">10MP": "isz:lt,islt:10mp",
|
|
">12MP": "isz:lt,islt:12mp",
|
|
">15MP": "isz:lt,islt:15mp",
|
|
">20MP": "isz:lt,islt:20mp",
|
|
">40MP": "isz:lt,islt:40mp",
|
|
">70MP": "isz:lt,islt:70mp",
|
|
},
|
|
],
|
|
"type": [
|
|
arguments["type"],
|
|
{
|
|
"face": "itp:face",
|
|
"photo": "itp:photo",
|
|
"clipart": "itp:clipart",
|
|
"line-drawing": "itp:lineart",
|
|
"animated": "itp:animated",
|
|
},
|
|
],
|
|
"time": [
|
|
arguments["time"],
|
|
{
|
|
"past-24-hours": "qdr:d",
|
|
"past-7-days": "qdr:w",
|
|
"past-month": "qdr:m",
|
|
"past-year": "qdr:y",
|
|
},
|
|
],
|
|
"aspect_ratio": [
|
|
arguments["aspect_ratio"],
|
|
{
|
|
"tall": "iar:t",
|
|
"square": "iar:s",
|
|
"wide": "iar:w",
|
|
"panoramic": "iar:xw",
|
|
},
|
|
],
|
|
"format": [
|
|
arguments["format"],
|
|
{
|
|
"jpg": "ift:jpg",
|
|
"gif": "ift:gif",
|
|
"png": "ift:png",
|
|
"bmp": "ift:bmp",
|
|
"svg": "ift:svg",
|
|
"webp": "webp",
|
|
"ico": "ift:ico",
|
|
"raw": "ift:craw",
|
|
},
|
|
],
|
|
}
|
|
for value in params.values():
|
|
if value[0] is not None:
|
|
ext_param = value[1][value[0]]
|
|
# counter will tell if it is first param added or not
|
|
if counter == 0:
|
|
# add it to the built url
|
|
built_url += ext_param
|
|
else:
|
|
built_url = built_url + "," + ext_param
|
|
counter += 1
|
|
built_url = lang_url + built_url + exact_size + time_range
|
|
return built_url
|
|
|
|
# building main search URL
|
|
|
|
def build_search_url(
|
|
self, search_term, params, url, similar_images, specific_site, safe_search
|
|
):
|
|
# check the args and choose the URL
|
|
if url:
|
|
url = url
|
|
elif similar_images:
|
|
keywordem = self.similar_images(similar_images)
|
|
url = (
|
|
"https://www.google.com/search?q="
|
|
+ keywordem
|
|
+ "&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg"
|
|
)
|
|
elif specific_site:
|
|
url = (
|
|
"https://www.google.com/search?q="
|
|
+ quote(search_term.encode("utf-8"))
|
|
+ "&as_sitesearch="
|
|
+ specific_site
|
|
+ "&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch"
|
|
+ params
|
|
+ "&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg"
|
|
)
|
|
else:
|
|
url = (
|
|
"https://www.google.com/search?q="
|
|
+ quote(search_term.encode("utf-8"))
|
|
+ "&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch"
|
|
+ params
|
|
+ "&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg"
|
|
)
|
|
|
|
# safe search check
|
|
if safe_search:
|
|
# check safe_search
|
|
safe_search_string = "&safe=active"
|
|
url = url + safe_search_string
|
|
|
|
return url
|
|
|
|
# measures the file size
|
|
|
|
def file_size(self, file_path):
|
|
if os.path.isfile(file_path):
|
|
file_info = os.stat(file_path)
|
|
size = file_info.st_size
|
|
for x in ["bytes", "KB", "MB", "GB", "TB"]:
|
|
if size < 1024.0:
|
|
return "%3.1f %s" % (size, x)
|
|
size /= 1024.0
|
|
return size
|
|
|
|
# keywords from file
|
|
def keywords_from_file(self, file_name):
|
|
search_keyword = []
|
|
with codecs.open(file_name, "r", encoding="utf-8-sig") as f:
|
|
if ".csv" in file_name or ".txt" in file_name:
|
|
for line in f:
|
|
if line not in ["\n", "\r\n"]:
|
|
search_keyword.append(line.replace("\n", "").replace("\r", ""))
|
|
else:
|
|
LOGS.info(
|
|
"Invalid file type: Valid file types are either .txt or .csv \n"
|
|
"exiting..."
|
|
)
|
|
sys.exit()
|
|
return search_keyword
|
|
|
|
# make directories
|
|
def create_directories(self, main_directory, dir_name, thumbnail, thumbnail_only):
|
|
dir_name_thumbnail = dir_name + " - thumbnail"
|
|
# make a search keyword directory
|
|
try:
|
|
if not os.path.exists(main_directory):
|
|
os.makedirs(main_directory)
|
|
time.sleep(0.15)
|
|
path = dir_name
|
|
sub_directory = os.path.join(main_directory, path)
|
|
if not os.path.exists(sub_directory):
|
|
os.makedirs(sub_directory)
|
|
if thumbnail or thumbnail_only:
|
|
sub_directory_thumbnail = os.path.join(
|
|
main_directory, dir_name_thumbnail
|
|
)
|
|
if not os.path.exists(sub_directory_thumbnail):
|
|
os.makedirs(sub_directory_thumbnail)
|
|
except OSError as e:
|
|
if e.errno != 17:
|
|
raise
|
|
|
|
# Download Image thumbnails
|
|
|
|
def download_image_thumbnail(
|
|
self,
|
|
image_url,
|
|
main_directory,
|
|
dir_name,
|
|
return_image_name,
|
|
socket_timeout,
|
|
no_download,
|
|
save_source,
|
|
img_src,
|
|
):
|
|
if no_download:
|
|
return "success", "Printed url without downloading"
|
|
try:
|
|
req = Request(
|
|
image_url,
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"
|
|
},
|
|
)
|
|
try:
|
|
# timeout time to download an image
|
|
timeout = float(socket_timeout) if socket_timeout else 10
|
|
response = urlopen(req, None, timeout)
|
|
data = response.read()
|
|
response.close()
|
|
|
|
path = (
|
|
main_directory
|
|
+ "/"
|
|
+ dir_name
|
|
+ " - thumbnail"
|
|
+ "/"
|
|
+ return_image_name
|
|
)
|
|
|
|
try:
|
|
with open(path, "wb") as output_file:
|
|
output_file.write(data)
|
|
if save_source:
|
|
list_path = main_directory + "/" + save_source + ".txt"
|
|
with open(list_path, "a") as list_file:
|
|
list_file.write(path + "\t" + img_src + "\n")
|
|
except OSError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"OSError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
|
|
download_status = "success"
|
|
download_message = (
|
|
"Completed Image Thumbnail ====> " + return_image_name
|
|
)
|
|
|
|
except UnicodeEncodeError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"UnicodeEncodeError on an image...trying next one..."
|
|
+ " Error: "
|
|
+ str(e)
|
|
)
|
|
|
|
except HTTPError as e: # If there is any HTTPError
|
|
download_status = "fail"
|
|
download_message = (
|
|
"HTTPError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
|
|
except URLError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"URLError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
|
|
except ssl.CertificateError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"CertificateError on an image...trying next one..."
|
|
+ " Error: "
|
|
+ str(e)
|
|
)
|
|
|
|
except IOError as e: # If there is any IOError
|
|
download_status = "fail"
|
|
download_message = (
|
|
"IOError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
return download_status, download_message
|
|
|
|
# Download Images
|
|
|
|
def download_image(
|
|
self,
|
|
image_url,
|
|
image_format,
|
|
main_directory,
|
|
dir_name,
|
|
count,
|
|
socket_timeout,
|
|
prefix,
|
|
no_numbering,
|
|
no_download,
|
|
save_source,
|
|
img_src,
|
|
thumbnail_only,
|
|
format,
|
|
ignore_urls,
|
|
):
|
|
if ignore_urls and any(url in image_url for url in ignore_urls.split(",")):
|
|
return (
|
|
"fail",
|
|
"Image ignored due to 'ignore url' parameter",
|
|
None,
|
|
image_url,
|
|
)
|
|
if thumbnail_only:
|
|
return (
|
|
"success",
|
|
"Skipping image download...",
|
|
str(image_url[(image_url.rfind("/")) + 1 :]),
|
|
image_url,
|
|
)
|
|
if no_download:
|
|
return "success", "Printed url without downloading", None, image_url
|
|
try:
|
|
req = Request(
|
|
image_url,
|
|
headers={
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"
|
|
},
|
|
)
|
|
try:
|
|
# timeout time to download an image
|
|
timeout = float(socket_timeout) if socket_timeout else 10
|
|
response = urlopen(req, None, timeout)
|
|
data = response.read()
|
|
response.close()
|
|
|
|
extensions = [
|
|
".jpg",
|
|
".jpeg",
|
|
".gif",
|
|
".png",
|
|
".bmp",
|
|
".svg",
|
|
".webp",
|
|
".ico",
|
|
]
|
|
# keep everything after the last '/'
|
|
image_name = str(image_url[(image_url.rfind("/")) + 1 :])
|
|
if format and (not image_format or image_format != format):
|
|
download_status = "fail"
|
|
download_message = "Wrong image format returned. Skipping..."
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
return (
|
|
download_status,
|
|
download_message,
|
|
return_image_name,
|
|
absolute_path,
|
|
)
|
|
|
|
if (
|
|
image_format == ""
|
|
or not image_format
|
|
or "." + image_format not in extensions
|
|
):
|
|
download_status = "fail"
|
|
download_message = "Invalid or missing image format. Skipping..."
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
return (
|
|
download_status,
|
|
download_message,
|
|
return_image_name,
|
|
absolute_path,
|
|
)
|
|
if image_name.lower().find("." + image_format) < 0:
|
|
image_name = image_name + "." + image_format
|
|
else:
|
|
image_name = image_name[
|
|
: image_name.lower().find("." + image_format)
|
|
+ (len(image_format) + 1)
|
|
]
|
|
|
|
# prefix name in image
|
|
prefix = prefix + " " if prefix else ""
|
|
if no_numbering:
|
|
path = main_directory + "/" + dir_name + "/" + prefix + image_name
|
|
else:
|
|
path = (
|
|
main_directory
|
|
+ "/"
|
|
+ dir_name
|
|
+ "/"
|
|
+ prefix
|
|
+ str(count)
|
|
+ "."
|
|
+ image_name
|
|
)
|
|
try:
|
|
with open(path, "wb") as output_file:
|
|
output_file.write(data)
|
|
if save_source:
|
|
list_path = main_directory + "/" + save_source + ".txt"
|
|
with open(list_path, "a") as list_file:
|
|
list_file.write(path + "\t" + img_src + "\n")
|
|
absolute_path = os.path.abspath(path)
|
|
except OSError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"OSError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
# return image name back to calling method to use it for
|
|
# thumbnail downloads
|
|
download_status = "success"
|
|
download_message = (
|
|
"Completed Image ====> " + prefix + str(count) + "." + image_name
|
|
)
|
|
return_image_name = prefix + str(count) + "." + image_name
|
|
|
|
except UnicodeEncodeError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"UnicodeEncodeError on an image...trying next one..."
|
|
+ " Error: "
|
|
+ str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
except URLError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"URLError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
except BadStatusLine as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"BadStatusLine on an image...trying next one..."
|
|
+ " Error: "
|
|
+ str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
except HTTPError as e: # If there is any HTTPError
|
|
download_status = "fail"
|
|
download_message = (
|
|
"HTTPError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
except URLError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"URLError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
except ssl.CertificateError as e:
|
|
download_status = "fail"
|
|
download_message = (
|
|
"CertificateError on an image...trying next one..."
|
|
+ " Error: "
|
|
+ str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
except IOError as e: # If there is any IOError
|
|
download_status = "fail"
|
|
download_message = (
|
|
"IOError on an image...trying next one..." + " Error: " + str(e)
|
|
)
|
|
return_image_name = ""
|
|
absolute_path = ""
|
|
|
|
return download_status, download_message, return_image_name, absolute_path
|
|
|
|
# Finding 'Next Image' from the given raw page
|
|
|
|
def _get_next_item(self, s):
|
|
start_line = s.find("rg_meta notranslate")
|
|
if start_line == -1: # If no links are found then give an error!
|
|
end_quote = 0
|
|
link = "no_links"
|
|
return link, end_quote
|
|
start_line = s.find('class="rg_meta notranslate">')
|
|
start_object = s.find("{", start_line + 1)
|
|
end_object = s.find("</div>", start_object + 1)
|
|
object_raw = str(s[start_object:end_object])
|
|
# remove escape characters based on python version
|
|
try:
|
|
object_decode = bytes(object_raw, "utf-8").decode("unicode_escape")
|
|
final_object = json.loads(object_decode)
|
|
except BaseException:
|
|
final_object = ""
|
|
return final_object, end_object
|
|
|
|
# Getting all links with the help of '_images_get_next_image'
|
|
|
|
def _get_image_objects(self, s):
|
|
start_line = s.find("AF_initDataCallback({key: \\'ds:1\\'") - 10
|
|
start_object = s.find("[", start_line + 1)
|
|
end_object = s.find("</script>", start_object + 1) - 4
|
|
object_raw = str(s[start_object:end_object])
|
|
object_decode = bytes(object_raw[:-1], "utf-8").decode("unicode_escape")
|
|
# LOGS.info(_format.paste_text(object_decode[:-15]))
|
|
return json.loads(object_decode[:-15])[31][0][12][2]
|
|
|
|
def _get_all_items(self, page, main_directory, dir_name, limit, arguments):
|
|
items = []
|
|
abs_path = []
|
|
errorCount = 0
|
|
i = 0
|
|
count = 1
|
|
# LOGS.info(f"page : {_format.paste_text(page)}")
|
|
image_objects = self._get_image_objects(page)
|
|
while count < limit + 1:
|
|
if not image_objects:
|
|
print("no_links")
|
|
break
|
|
else:
|
|
# format the item for readability
|
|
try:
|
|
object = self.format_object(image_objects[i])
|
|
# download the images
|
|
(
|
|
download_status,
|
|
download_message,
|
|
return_image_name,
|
|
absolute_path,
|
|
) = self.download_image(
|
|
object["image_link"],
|
|
object["image_format"],
|
|
main_directory,
|
|
dir_name,
|
|
count,
|
|
arguments["socket_timeout"],
|
|
arguments["prefix"],
|
|
arguments["no_numbering"],
|
|
arguments["no_download"],
|
|
arguments["save_source"],
|
|
object["image_source"],
|
|
arguments["thumbnail_only"],
|
|
arguments["format"],
|
|
arguments["ignore_urls"],
|
|
)
|
|
except (TypeError, IndexError) as er:
|
|
LOGS.debug(er)
|
|
download_status = None
|
|
|
|
if download_status == "success":
|
|
|
|
# download image_thumbnails
|
|
if arguments["thumbnail"] or arguments["thumbnail_only"]:
|
|
(
|
|
download_status,
|
|
download_message_thumbnail,
|
|
) = self.download_image_thumbnail(
|
|
object["image_thumbnail_url"],
|
|
main_directory,
|
|
dir_name,
|
|
return_image_name,
|
|
arguments["socket_timeout"],
|
|
arguments["no_download"],
|
|
arguments["save_source"],
|
|
object["image_source"],
|
|
arguments["ignore_urls"],
|
|
)
|
|
|
|
count += 1
|
|
object["image_filename"] = return_image_name
|
|
# Append all the links in the list named 'Links'
|
|
items.append(object)
|
|
abs_path.append(absolute_path)
|
|
else:
|
|
errorCount += 1
|
|
|
|
# delay param
|
|
if arguments["delay"]:
|
|
time.sleep(int(arguments["delay"]))
|
|
i += 1
|
|
if count < limit:
|
|
LOGS.info(
|
|
"\n\nUnfortunately all "
|
|
+ str(limit)
|
|
+ " could not be downloaded because some images were not downloadable. "
|
|
+ str(count - 1)
|
|
+ " is all we got for this search filter!"
|
|
)
|
|
return items, errorCount, abs_path
|
|
|
|
# Bulk Download
|
|
|
|
async def download(self, arguments):
|
|
paths_agg = {}
|
|
# for input coming from other python files
|
|
if __name__ != "__main__":
|
|
# if the calling file contains config_file param
|
|
if "config_file" in arguments:
|
|
records = []
|
|
json_file = json.load(open(arguments["config_file"]))
|
|
for item in json_file["Records"]:
|
|
arguments = {}
|
|
for i in args_list:
|
|
arguments[i] = None
|
|
for key, value in item.items():
|
|
arguments[key] = value
|
|
records.append(arguments)
|
|
total_errors = 0
|
|
for rec in records:
|
|
paths, errors = await self.download_executor(rec)
|
|
for i in paths:
|
|
paths_agg[i] = paths[i]
|
|
total_errors += errors
|
|
return paths_agg, total_errors
|
|
# if the calling file contains params directly
|
|
paths, errors = await self.download_executor(arguments)
|
|
for i in paths:
|
|
paths_agg[i] = paths[i]
|
|
return paths_agg, errors
|
|
# for input coming from CLI
|
|
paths, errors = await self.download_executor(arguments)
|
|
for i in paths:
|
|
paths_agg[i] = paths[i]
|
|
return paths_agg, errors
|
|
|
|
async def download_executor(self, arguments):
|
|
paths = {}
|
|
errorCount = None
|
|
for arg in args_list:
|
|
if arg not in arguments:
|
|
arguments[arg] = None
|
|
# Initialization and Validation of user arguments
|
|
if arguments["keywords"]:
|
|
search_keyword = [str(item) for item in arguments["keywords"].split(",")]
|
|
|
|
if arguments["keywords_from_file"]:
|
|
search_keyword = self.keywords_from_file(arguments["keywords_from_file"])
|
|
|
|
# both time and time range should not be allowed in the same query
|
|
if arguments["time"] and arguments["time_range"]:
|
|
raise ValueError(
|
|
"Either time or time range should be used in a query. Both cannot be used at the same time."
|
|
)
|
|
|
|
# both time and time range should not be allowed in the same query
|
|
if arguments["size"] and arguments["exact_size"]:
|
|
raise ValueError(
|
|
'Either "size" or "exact_size" should be used in a query. Both cannot be used at the same time.'
|
|
)
|
|
|
|
# both image directory and no image directory should not be allowed in
|
|
# the same query
|
|
if arguments["image_directory"] and arguments["no_directory"]:
|
|
raise ValueError(
|
|
"You can either specify image directory or specify no image directory, not both!"
|
|
)
|
|
|
|
# Additional words added to keywords
|
|
if arguments["suffix_keywords"]:
|
|
suffix_keywords = [
|
|
" " + str(sk) for sk in arguments["suffix_keywords"].split(",")
|
|
]
|
|
else:
|
|
suffix_keywords = [""]
|
|
|
|
# Additional words added to keywords
|
|
if arguments["prefix_keywords"]:
|
|
prefix_keywords = [
|
|
str(sk) + " " for sk in arguments["prefix_keywords"].split(",")
|
|
]
|
|
else:
|
|
prefix_keywords = [""]
|
|
|
|
# Setting limit on number of images to be downloaded
|
|
limit = int(arguments["limit"]) if arguments["limit"] else 100
|
|
if arguments["url"]:
|
|
current_time = str(datetime.datetime.now()).split(".")[0]
|
|
search_keyword = [current_time.replace(":", "_")]
|
|
|
|
if arguments["similar_images"]:
|
|
current_time = str(datetime.datetime.now()).split(".")[0]
|
|
search_keyword = [current_time.replace(":", "_")]
|
|
|
|
# If single_image or url argument not present then keywords is
|
|
# mandatory argument
|
|
if (
|
|
arguments["single_image"] is None
|
|
and arguments["url"] is None
|
|
and arguments["similar_images"] is None
|
|
and arguments["keywords"] is None
|
|
and arguments["keywords_from_file"] is None
|
|
):
|
|
LOGS.info(
|
|
"-------------------------------\n"
|
|
"Uh oh! Keywords is a required argument \n\n"
|
|
"Please refer to the documentation on guide to writing queries \n"
|
|
"https://github.com/hardikvasa/google-images-download#examples"
|
|
"\n\nexiting!\n"
|
|
"-------------------------------"
|
|
)
|
|
sys.exit()
|
|
|
|
# If this argument is present, set the custom output directory
|
|
main_directory = arguments["output_directory"] or "downloads"
|
|
# Proxy settings
|
|
if arguments["proxy"]:
|
|
os.environ["http_proxy"] = arguments["proxy"]
|
|
os.environ["https_proxy"] = arguments["proxy"]
|
|
# Initialization Complete
|
|
total_errors = 0
|
|
for pky in prefix_keywords: # 1.for every prefix keywords
|
|
for sky in suffix_keywords: # 2.for every suffix keywords
|
|
for ii, e in enumerate(search_keyword): # 3.for every main keyword
|
|
iteration = (
|
|
"\n"
|
|
+ "Item no.: "
|
|
+ str(ii + 1)
|
|
+ " -->"
|
|
+ " Item name = "
|
|
+ (pky)
|
|
+ (e)
|
|
+ (sky)
|
|
)
|
|
search_term = pky + e + sky
|
|
|
|
if arguments["image_directory"]:
|
|
dir_name = arguments["image_directory"]
|
|
elif arguments["no_directory"]:
|
|
dir_name = ""
|
|
else:
|
|
dir_name = search_term + (
|
|
"-" + arguments["color"] if arguments["color"] else ""
|
|
) # sub-directory
|
|
|
|
if not arguments["no_download"]:
|
|
self.create_directories(
|
|
main_directory,
|
|
dir_name,
|
|
arguments["thumbnail"],
|
|
arguments["thumbnail_only"],
|
|
) # create directories in OS
|
|
|
|
params = self.build_url_parameters(
|
|
arguments
|
|
) # building URL with params
|
|
|
|
url = self.build_search_url(
|
|
search_term,
|
|
params,
|
|
arguments["url"],
|
|
arguments["similar_images"],
|
|
arguments["specific_site"],
|
|
arguments["safe_search"],
|
|
) # building main search url
|
|
|
|
if limit < 101:
|
|
# download page
|
|
raw_html = await self.download_page(url)
|
|
else:
|
|
raw_html = self.download_extended_page(
|
|
url, arguments["chromedriver"]
|
|
)
|
|
|
|
items, errorCount, abs_path = self._get_all_items(
|
|
raw_html, main_directory, dir_name, limit, arguments
|
|
) # get all image items and download images
|
|
paths[pky + e + sky] = abs_path
|
|
|
|
# dumps into a json file
|
|
if arguments["extract_metadata"]:
|
|
try:
|
|
if not os.path.exists("logs"):
|
|
os.makedirs("logs")
|
|
except OSError as e:
|
|
LOGS.exception(e)
|
|
with open("logs/" + e + ".json", "w") as json_file:
|
|
json.dump(items, json_file, indent=4, sort_keys=True)
|
|
# Related images
|
|
if arguments["related_images"]:
|
|
tabs = self.get_all_tabs(raw_html)
|
|
for key, value in tabs.items():
|
|
final_search_term = search_term + " - " + key
|
|
if limit < 101:
|
|
new_raw_html = await self.download_page(
|
|
value
|
|
) # download page
|
|
else:
|
|
new_raw_html = self.download_extended_page(
|
|
value, arguments["chromedriver"]
|
|
)
|
|
self.create_directories(
|
|
main_directory,
|
|
final_search_term,
|
|
arguments["thumbnail"],
|
|
arguments["thumbnail_only"],
|
|
)
|
|
self._get_all_items(
|
|
new_raw_html,
|
|
main_directory,
|
|
search_term + " - " + key,
|
|
limit,
|
|
arguments,
|
|
)
|
|
|
|
total_errors += errorCount
|
|
return paths, total_errors
|