# -*- coding: utf-8 -*-
""" This module contains classes:
HostsEntry:
A representation of a hosts file entry, i.e. a line containing an IP address
and name(s), a comment, or a blank line/line separator.
Hosts:
A representation of a hosts file, e.g. /etc/hosts and
c:\\\\windows\\\\system32\\\\drivers\\\\etc\\\\hosts for a linux or MS windows
based machine respectively. Each entry being represented as an instance
of the HostsEntry class.
"""
import sys
try:
from urllib.request import urlopen
except ImportError: # pragma: no cover
from urllib2 import urlopen
from python_hosts.utils import (is_ipv4, is_ipv6, is_readable, valid_hostnames,
dedupe_list)
from python_hosts.exception import (InvalidIPv6Address, InvalidIPv4Address,
UnableToWriteHosts)
[docs]class HostsEntry(object):
""" An entry in a hosts file. """
__slots__ = ['entry_type', 'address', 'comment', 'names']
def __init__(self,
entry_type=None,
address=None,
comment=None,
names=None):
"""
Initialise an instance of a Hosts file entry
:param entry_type: ipv4 | ipv6 | comment | blank
:param address: The ipv4 or ipv6 address belonging to the instance
:param comment: The comment belonging to the instance
:param names: The names that resolve to the specified address
:return: None
"""
if not entry_type or entry_type not in ('ipv4',
'ipv6',
'comment',
'blank'):
raise Exception('entry_type invalid or not specified')
if entry_type == 'comment' and not comment:
raise Exception('entry_type comment supplied without value.')
if entry_type == 'ipv4':
if not all((address, names)):
raise Exception('Address and Name(s) must be specified.')
if not is_ipv4(address):
raise InvalidIPv4Address()
if entry_type == 'ipv6':
if not all((address, names)):
raise Exception('Address and Name(s) must be specified.')
if not is_ipv6(address):
raise InvalidIPv6Address()
self.entry_type = entry_type
self.address = address
self.comment = comment
self.names = names
[docs] def is_real_entry(self):
return self.entry_type in ('ipv4', 'ipv6')
def __repr__(self):
return "HostsEntry(entry_type=\'{0}\', address=\'{1}\', " \
"comment={2}, names={3})".format(self.entry_type,
self.address,
self.comment,
self.names)
def __str__(self):
if self.entry_type in ('ipv4', 'ipv6'):
return "TYPE={0}, ADDR={1}, NAMES={2}".format(self.entry_type,
self.address,
" ".join(self.names))
elif self.entry_type == 'comment':
return "TYPE = {0}, COMMENT = {1}".format(self.entry_type, self.comment)
elif self.entry_type == 'blank':
return "TYPE = {0}".format(self.entry_type)
[docs] @staticmethod
def get_entry_type(hosts_entry=None):
"""
Return the type of entry for the line of hosts file passed
:param hosts_entry: A line from the hosts file
:return: 'comment' | 'blank' | 'ipv4' | 'ipv6'
"""
if hosts_entry and isinstance(hosts_entry, str):
entry = hosts_entry.strip()
if not entry or not entry[0] or entry[0] == "\n":
return 'blank'
if entry[0] == "#":
return 'comment'
entry_chunks = entry.split()
if is_ipv6(entry_chunks[0]):
return 'ipv6'
if is_ipv4(entry_chunks[0]):
return 'ipv4'
[docs] @staticmethod
def str_to_hostentry(entry):
"""
Transform a line from a hosts file into an instance of HostsEntry
:param entry: A line from the hosts file
:return: An instance of HostsEntry
"""
line_parts = entry.strip().split()
if is_ipv4(line_parts[0]) and valid_hostnames(line_parts[1:]):
return HostsEntry(entry_type='ipv4',
address=line_parts[0],
names=line_parts[1:])
elif is_ipv6(line_parts[0]) and valid_hostnames(line_parts[1:]):
return HostsEntry(entry_type='ipv6',
address=line_parts[0],
names=line_parts[1:])
else:
return False
[docs]class Hosts(object):
""" A hosts file. """
__slots__ = ['entries', 'hosts_path']
def __init__(self, path=None):
"""
Initialise an instance of a hosts file
:param path: The filesystem path of the hosts file to manage
:return: None
"""
self.entries = []
if path:
self.hosts_path = path
else:
self.hosts_path = self.determine_hosts_path()
self.populate_entries()
def __repr__(self):
return 'Hosts(hosts_path=\'{0}\', entries={1})'.format(self.hosts_path, self.entries)
def __str__(self):
output = ('hosts_path={0}, '.format(self.hosts_path))
for entry in self.entries:
output += str(entry)
return output
[docs] def count(self):
""" Get a count of the number of host entries
:return: The number of host entries
"""
return len(self.entries)
[docs] @staticmethod
def determine_hosts_path(platform=None):
"""
Return the hosts file path based on the supplied
or detected platform.
:param platform: a string used to identify the platform
:return: detected filesystem path of the hosts file
"""
if not platform:
platform = sys.platform
if platform.startswith('win'):
result = r"c:\windows\system32\drivers\etc\hosts"
return result
else:
return '/etc/hosts'
[docs] def write(self, path=None):
"""
Write all of the HostsEntry instances back to the hosts file
:param path: override the write path
:return: Dictionary containing counts
"""
written_count = 0
comments_written = 0
blanks_written = 0
ipv4_entries_written = 0
ipv6_entries_written = 0
if path:
output_file_path = path
else:
output_file_path = self.hosts_path
try:
with open(output_file_path, 'w') as hosts_file:
for written_count, line in enumerate(self.entries):
if line.entry_type == 'comment':
hosts_file.write(line.comment + "\n")
comments_written += 1
if line.entry_type == 'blank':
hosts_file.write("\n")
blanks_written += 1
if line.entry_type == 'ipv4':
hosts_file.write(
"{0}\t{1}\n".format(
line.address,
' '.join(line.names),
)
)
ipv4_entries_written += 1
if line.entry_type == 'ipv6':
hosts_file.write(
"{0}\t{1}\n".format(
line.address,
' '.join(line.names), ))
ipv6_entries_written += 1
except:
raise UnableToWriteHosts()
return {'total_written': written_count + 1,
'comments_written': comments_written,
'blanks_written': blanks_written,
'ipv4_entries_written': ipv4_entries_written,
'ipv6_entries_written': ipv6_entries_written}
[docs] @staticmethod
def get_hosts_by_url(url=None):
"""
Request the content of a URL and return the response
:param url: The URL of the hosts file to download
:return: The content of the passed URL
"""
response = urlopen(url)
return response.read()
[docs] def exists(self, address=None, names=None, comment=None):
"""
Determine if the supplied address and/or names, or comment, exists in a HostsEntry within Hosts
:param address: An ipv4 or ipv6 address to search for
:param names: A list of names to search for
:param comment: A comment to search for
:return: True if a supplied address, name, or comment is found. Otherwise, False.
"""
for entry in self.entries:
if entry.entry_type in ('ipv4', 'ipv6'):
if address and address == entry.address:
return True
if names:
for name in names:
if name in entry.names:
return True
elif entry.entry_type == 'comment' and entry.comment == comment:
return True
return False
[docs] def remove_all_matching(self, address=None, name=None):
"""
Remove all HostsEntry instances from the Hosts object
where the supplied ip address or name matches
:param address: An ipv4 or ipv6 address
:param name: A host name
:return: None
"""
if self.entries:
if address and name:
func = lambda entry: not entry.is_real_entry() or (entry.address != address and name not in entry.names)
elif address:
func = lambda entry: not entry.is_real_entry() or entry.address != address
elif name:
func = lambda entry: not entry.is_real_entry() or name not in entry.names
else:
raise ValueError('No address or name was specified for removal.')
self.entries = list(filter(func, self.entries))
[docs] def find_all_matching(self, address=None, name=None):
"""
Return all HostsEntry instances from the Hosts object
where the supplied ip address or name matches
:param address: An ipv4 or ipv6 address
:param name: A host name
:return: HostEntry instances
"""
results = []
if self.entries:
for entry in self.entries:
if not entry.is_real_entry():
continue
if address and name:
if address == entry.address and name in entry.names:
results.append(entry)
elif address and address == entry.address:
results.append(entry)
elif name in entry.names:
results.append(entry)
return results
[docs] def import_url(self, url=None, force=None):
"""
Read a list of host entries from a URL, convert them into instances of HostsEntry and
then append to the list of entries in Hosts
:param url: The URL of where to download a hosts file
:return: Counts reflecting the attempted additions
"""
file_contents = self.get_hosts_by_url(url=url).decode('utf-8')
file_contents = file_contents.rstrip().replace('^M', '\n')
file_contents = file_contents.rstrip().replace('\r\n', '\n')
lines = file_contents.split('\n')
skipped = 0
import_entries = []
for line in lines:
stripped_entry = line.strip()
if (not stripped_entry) or (stripped_entry.startswith('#')):
skipped += 1
else:
line = line.partition('#')[0]
line = line.rstrip()
import_entry = HostsEntry.str_to_hostentry(line)
if import_entry:
import_entries.append(import_entry)
add_result = self.add(entries=import_entries, force=force)
write_result = self.write()
return {'result': 'success',
'skipped': skipped,
'add_result': add_result,
'write_result': write_result}
[docs] def import_file(self, import_file_path=None):
"""
Read a list of host entries from a file, convert them into instances
of HostsEntry and then append to the list of entries in Hosts
:param import_file_path: The path to the file containing the host entries
:return: Counts reflecting the attempted additions
"""
skipped = 0
invalid_count = 0
if is_readable(import_file_path):
import_entries = []
with open(import_file_path, 'r') as infile:
for line in infile:
stripped_entry = line.strip()
if (not stripped_entry) or (stripped_entry.startswith('#')):
skipped += 1
else:
line = line.partition('#')[0]
line = line.rstrip()
import_entry = HostsEntry.str_to_hostentry(line)
if import_entry:
import_entries.append(import_entry)
else:
invalid_count += 1
add_result = self.add(entries=import_entries)
write_result = self.write()
return {'result': 'success',
'skipped': skipped,
'invalid_count': invalid_count,
'add_result': add_result,
'write_result': write_result}
else:
return {'result': 'failed',
'message': 'Cannot read: file {0}.'.format(import_file_path)}
[docs] def add(self, entries=None, force=False, allow_address_duplication=False, merge_names=False):
"""
Add instances of HostsEntry to the instance of Hosts.
:param entries: A list of instances of HostsEntry
:param force: Remove matching before adding
:param allow_address_duplication: Allow using multiple entries for same address
:param merge_names: Merge names where address already exists
:return: The counts of successes and failures
"""
ipv4_count = 0
ipv6_count = 0
comment_count = 0
invalid_count = 0
duplicate_count = 0
replaced_count = 0
import_entries = []
existing_addresses = [x.address for x in self.entries if x.address]
existing_names = []
for item in self.entries:
if item.names:
existing_names.extend(item.names)
existing_names = dedupe_list(existing_names)
for entry in entries:
if entry.entry_type == 'comment':
entry.comment = entry.comment.strip()
if entry.comment[0] != "#":
entry.comment = "# " + entry.comment
import_entries.append(entry)
elif entry.address in ('0.0.0.0', '127.0.0.1') or allow_address_duplication:
# Allow duplicates entries for addresses used for adblocking
if set(entry.names).intersection(existing_names):
if force:
for name in entry.names:
self.remove_all_matching(name=name)
import_entries.append(entry)
else:
duplicate_count += 1
else:
import_entries.append(entry)
elif entry.address in existing_addresses:
if not any((force, merge_names)):
duplicate_count += 1
elif merge_names:
# get the last entry with matching address
entry_names = list()
for existing_entry in self.entries:
if entry.address == existing_entry.address:
entry_names = existing_entry.names
break
# merge names with that entry
merged_names = list(set(entry.names + entry_names))
# remove all matching
self.remove_all_matching(address=entry.address)
# append merged entry
entry.names = merged_names
import_entries.append(entry)
elif force:
self.remove_all_matching(address=entry.address)
replaced_count += 1
import_entries.append(entry)
elif set(entry.names).intersection(existing_names):
if not force:
duplicate_count += 1
else:
for name in entry.names:
self.remove_all_matching(name=name)
replaced_count += 1
import_entries.append(entry)
else:
import_entries.append(entry)
for item in import_entries:
if item.entry_type == 'comment':
comment_count += 1
self.entries.append(item)
elif item.entry_type == 'ipv4':
ipv4_count += 1
self.entries.append(item)
elif item.entry_type == 'ipv6':
ipv6_count += 1
self.entries.append(item)
return {'comment_count': comment_count,
'ipv4_count': ipv4_count,
'ipv6_count': ipv6_count,
'invalid_count': invalid_count,
'duplicate_count': duplicate_count,
'replaced_count': replaced_count}
[docs] def populate_entries(self):
"""
Called by the initialiser of Hosts. This reads the entries from the local hosts file,
converts them into instances of HostsEntry and adds them to the Hosts list of entries.
:return: None
"""
try:
with open(self.hosts_path, 'r') as hosts_file:
hosts_entries = [line for line in hosts_file]
for hosts_entry in hosts_entries:
entry_type = HostsEntry.get_entry_type(hosts_entry)
if entry_type == "comment":
hosts_entry = hosts_entry.replace("\r", "")
hosts_entry = hosts_entry.replace("\n", "")
self.entries.append(HostsEntry(entry_type="comment",
comment=hosts_entry))
elif entry_type == "blank":
self.entries.append(HostsEntry(entry_type="blank"))
elif entry_type in ("ipv4", "ipv6"):
chunked_entry = hosts_entry.split()
stripped_name_list = [name.strip() for name in chunked_entry[1:]]
self.entries.append(
HostsEntry(
entry_type=entry_type,
address=chunked_entry[0].strip(),
names=stripped_name_list))
except IOError:
return {'result': 'failed',
'message': 'Cannot read: {0}.'.format(self.hosts_path)}