#!/usr/bin/python3
"""Connection to webpage are manage with requests module.
Thoses errors are waiting for: timeout with socket module and with urllib3 mudule
and all RequestException errors."""
import requests
from urllib.parse import urlparse
from reppy.cache import RobotsCache
from reppy.exceptions import ServerError
from swiftea_bot.data import USER_AGENT, HEADERS, TIMEOUT
from swiftea_bot.module import tell, remove_duplicates
from crawling import parsers, connection
from crawling.searches import clean_link
[docs]class WebConnection(object):
"""Manage the web connection with the page to crawl."""
def __init__(self):
self.reqrobots = RobotsCache(capacity=100)
self.parser_encoding = parsers.ExtractEncoding()
[docs] def get_code(self, url):
"""Get source code of given url.
:param url: url of webpage
:type url: str
:return: source code, True if no take links, score and new url (redirection)
"""
nofollow, url = connection.is_nofollow(url)
result = self.send_request(url)
if not isinstance(result, requests.models.Response):
return None, result, None, None, url
else:
request = result
del result
allowed = self.check_robots_perm(url)
if request.status_code == requests.codes.ok and request.headers.get('Content-Type', '').startswith('text/html') and allowed:
# Search encoding of webpage:
request.encoding, score = self.search_encoding(request.headers, request.text)
new_url, code = self.duplicate_content(request, url) # new_url is clean and maybe without params
all_urls = connection.all_urls(request) # List of urls to delete
if new_url in all_urls: # new_url don't be delete
all_urls.remove(new_url)
return new_url, code, nofollow, score, all_urls
else:
tell('Webpage infos: status code=' + str(request.status_code) + ', Content-Type=' + \
request.headers.get('Content-Type', '') + ', robots perm=' + str(allowed), severity=0)
# All redirections urls, the first and the last:
all_urls = connection.all_urls(request)
all_urls.append(request.url)
all_urls.append(url)
return None, 'ignore', None, None, remove_duplicates(all_urls)
[docs] def send_request(self, url):
try:
request = requests.get(url, headers=HEADERS, timeout=TIMEOUT)
except requests.packages.urllib3.exceptions.ReadTimeoutError:
tell('Read timeout error (urllib3): ' + url, 3)
return None
except requests.exceptions.Timeout:
tell('Timeout error: ' + url, 4)
return None
except (requests.exceptions.RequestException, requests.exceptions.ConnectionError) as error:
tell('Connection failed: {}, {}'.format(str(error), url), 5)
if connection.no_connection():
return 'no connection'
else:
return None
except UnicodeDecodeError as error:
tell('UnicodeDecodeError: ' + str(error))
else:
return request
[docs] def search_encoding(self, headers, code):
"""Searche encoding of webpage in source code.
If an encoding is found in source code, score is 1, but if not
score is 0 and encoding is utf-8.
:param headers: hearders of requests
:type headers: dict
:param code: source code
:type code: str
:return: encoding of webpage and it score
"""
# Search in headers:
headers = str(headers).lower()
charset = headers.find('charset')
end_charset = headers.find('\'', charset)
if charset != -1 and end_charset != -1:
return headers[charset+8:end_charset], 1
else:
# Search in source code:
self.parser_encoding.feed(code)
if self.parser_encoding.encoding != '':
return self.parser_encoding.encoding, 1
else:
tell('No encoding', 9, severity=0)
return 'utf-8', 0
[docs] def check_robots_perm(self, url):
"""Check robots.txt for permission.
:param url: webpage url
:type url: str
:return: True if can crawl
"""
try:
allowed = self.reqrobots.allowed(url, USER_AGENT)
except ServerError as error:
tell('Error robots.txt (reppy): ' + str(error) + ' ' + url, 6)
allowed = True
except requests.exceptions.Timeout:
tell('Error robots.txt (timeout): ' + url)
allowed = True
except requests.exceptions.RequestException as error:
tell('Error robots.txt (requests): ' + str(error) + ' ' + url, 7)
allowed = True
except Exception as error:
tell('Unknow robots.txt error: ' + str(error) + ' ' + url, 8)
allowed = True
else:
return allowed
[docs] def duplicate_content(self, request1, url):
"""Avoid param duplicate.
Compare source codes with params and whitout.
Return url whitout params if it's the same content.
:param request: request
:type request: requests.models.Response
:return: url, source code
"""
url1 = clean_link(request1.url)
if url1 is None:
return url, request1.text
infos_url = urlparse(url1)
if infos_url.query != '':
new_url = infos_url.scheme + '://' + infos_url.netloc + infos_url.path
request2 = self.send_request(new_url)
if not isinstance(request2, requests.models.Response):
return url1, request1.text
request2.encoding = self.search_encoding(request2.headers, request2.text)[0]
url2 = clean_link(request2.url)
if url2 is None:
return url1, request1.text
if connection.duplicate_content(request1.text, request2.text):
tell("Same content: " + url1 + " and " + url2) # Tests
return url2, request2.text
else:
return url1, request1.text
else:
return url1, request1.text