373 lines
13 KiB
Python
Executable File
373 lines
13 KiB
Python
Executable File
#!/usr/bin/python3
|
||
"""
|
||
Simple script that extracts information from Télé 7 jours and TMDB
|
||
to help select and plan the movies you want to record with your Freebox
|
||
"""
|
||
import argparse
|
||
import datetime
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import requests
|
||
import socket
|
||
import textwrap
|
||
import tmdbsimple
|
||
from pyfbx.pyfbx import Fbx
|
||
from bs4 import BeautifulSoup
|
||
from collections import deque
|
||
|
||
|
||
class Movie:
|
||
def __init__(self):
|
||
self.day = ''
|
||
self.title = ''
|
||
self.genre = ''
|
||
self.channel = ''
|
||
self.rating = ''
|
||
self.original_title = ''
|
||
self.overview = ''
|
||
self.good = False
|
||
self.tmdb_id = ''
|
||
self.url = ''
|
||
self.user_selected = False
|
||
self.date = 0
|
||
self.start_time = 0
|
||
self.end_time = 0
|
||
self.year = ''
|
||
|
||
def __str__(self):
|
||
return '{}: {} - {} ({})\n TMDB: {} - {} - {}\n @ {}\n {}'.format(
|
||
'Today' if self.day == '' else self.day,
|
||
self.title,
|
||
self.genre,
|
||
self.channel,
|
||
self.rating,
|
||
self.original_title,
|
||
self.year,
|
||
self.url,
|
||
self.overview
|
||
)
|
||
|
||
def __repr__(self):
|
||
return "Movie <{} (D:{} — Ch:{} – R:{})>".format(
|
||
self.title,
|
||
'Today' if self.day == '' else self.day,
|
||
self.channel,
|
||
self.rating
|
||
)
|
||
|
||
|
||
class TVGuideScraper:
|
||
TV_GUIDE_URL = 'https://programme-tv.nouvelobs.com/programme-free/categorie-film/{}/'
|
||
|
||
def findAllMovies():
|
||
movies = []
|
||
days = deque(['lundi', 'mardi', 'mercredi',
|
||
'jeudi', 'vendredi', 'samedi', 'dimanche'])
|
||
offset = datetime.datetime.today().weekday()
|
||
days.rotate(-offset)
|
||
date = datetime.date.today()
|
||
for day in days:
|
||
movies += TVGuideScraper._getMovies(day, date)
|
||
date += datetime.timedelta(days=1)
|
||
logging.debug('Found the following movies: {}'.format(movies))
|
||
return movies
|
||
|
||
@staticmethod
|
||
def _getMovies(day='', date=datetime.date.today()):
|
||
url = TVGuideScraper.TV_GUIDE_URL.format(day)
|
||
logging.info('Connecting to {}'.format(url))
|
||
r = requests.get(url)
|
||
r.raise_for_status()
|
||
html = BeautifulSoup(r.text, 'html.parser')
|
||
movies = []
|
||
for channel in html.select('.tab_grille'):
|
||
for movietag in channel.select('.cat-film'):
|
||
try:
|
||
movie = Movie()
|
||
movie.title = movietag.select('a.titre')[0].string
|
||
movie.genre = 'Film' # Genre is not available
|
||
movie.channel = channel.select('.logo_chaine_g img')[0]\
|
||
['alt'].replace('Programme ','')
|
||
movie.day = day.title()
|
||
movie.date = datetime.date.strftime(date, '%Y-%m-%d')
|
||
movie.start_time = datetime.datetime.strptime(
|
||
'{} {}'.format(
|
||
movie.date,
|
||
movietag.select('span.t16')[0].string
|
||
),
|
||
'%Y-%m-%d %H.%M'
|
||
)
|
||
duration = TVGuideScraper._parse_duration(
|
||
re.search(r'\((.*) mn\)',movietag.text).group(1)
|
||
)
|
||
movie.end_time = movie.start_time + duration
|
||
|
||
logging.debug('Found movie: {0!r}'.format(movie))
|
||
movies.append(movie)
|
||
except:
|
||
logging.warning('Error parsing movie from tag: {0!r}'.format(movietag))
|
||
|
||
return movies
|
||
|
||
@staticmethod
|
||
def _tag_is_movie(tag):
|
||
"""
|
||
Helper to check if a tag is a movie
|
||
"""
|
||
return (
|
||
tag.has_attr('data-nature')
|
||
and
|
||
tag['data-nature'] == 'films-telefilms'
|
||
)
|
||
|
||
@staticmethod
|
||
def _parse_duration(text):
|
||
minutes = int(text)
|
||
return datetime.timedelta(minutes=minutes)
|
||
|
||
|
||
class FreeboxMoviePlanner:
|
||
def __init__(self, movies, excluded_channels=[], excluded_directory=[]):
|
||
logging.debug('Opening config file: config.json')
|
||
with open('config.json') as config_file:
|
||
self.config = json.load(config_file)
|
||
if(len(self.config['freebox-session-token']) != 64):
|
||
self.createAuthenticationToken()
|
||
tmdbsimple.API_KEY = self.config['tmdb-api']
|
||
self.movies = movies
|
||
self.excluded_directory = excluded_directory
|
||
|
||
logging.info('Opening Freebox session')
|
||
self.freebox = Fbx()
|
||
self.freebox.mksession(
|
||
app_id='FreeboxMoviePlanner',
|
||
token=self.config['freebox-session-token']
|
||
)
|
||
self.getListOfAvailableChannels(excluded_channels)
|
||
self.excludeUnavailableChannels()
|
||
self.excludeTelevisionMovie()
|
||
self.findMoviesOnTMDB()
|
||
self.excludeBadRatings()
|
||
for directory in self.excluded_directory:
|
||
self.excludeLocalMovies(directory)
|
||
self.askForUserSelection()
|
||
self.excludeNotSelected()
|
||
self.programMovies()
|
||
self.checkForConflicts()
|
||
|
||
def createAuthenticationToken(self):
|
||
logging.info('Creating authentication token')
|
||
self.freebox = Fbx()
|
||
hostname = socket.gethostname()
|
||
print("You don't seem to have an authentication token.")
|
||
print("I will now atempt to create one.")
|
||
print("Please go to your Freebox and accept the authentication.")
|
||
token = self.freebox.register(
|
||
"FreeboxMoviePlanner", "FreeboxMoviePlanner", hostname
|
||
)
|
||
self.config['freebox-session-token'] = token
|
||
with open('config.json', 'w') as outfile:
|
||
json.dump(self.config, outfile, indent=4, sort_keys=True)
|
||
|
||
def __repr__(self):
|
||
result = 'FreeboxMoviePlanner <Movies:\n'
|
||
for movie in self.movies:
|
||
result += ' {!r}\n'.format(movie)
|
||
result += '>'
|
||
return result
|
||
|
||
def getListOfAvailableChannels(self, excluded_channels):
|
||
logging.info('Getting the list of available channels')
|
||
self.channels = {}
|
||
for channel in self.freebox.Tv.Getting_the_list_of_channels().values():
|
||
if channel['available']:
|
||
if channel['name'] in excluded_channels:
|
||
logging.debug(
|
||
"Excluding '{}'".format(channel['name'])
|
||
)
|
||
else:
|
||
self.channels[channel['name'].lower()] = channel['uuid']
|
||
else:
|
||
logging.debug("Dropping '{}'".format(channel['name']))
|
||
logging.debug('Got the following channels: {}'.format(self.channels))
|
||
|
||
def printAllMovies(self):
|
||
for movie in self.movies:
|
||
print('{!r}'.format(movie))
|
||
|
||
def askForUserSelection(self):
|
||
for movie in self.movies:
|
||
print()
|
||
print(movie)
|
||
reply = input("Interested? (y)es/(N)o/(q)uit: ")
|
||
if reply.upper() == "Y":
|
||
movie.user_selected = True
|
||
elif reply.upper() == "Q":
|
||
break
|
||
|
||
def findMoviesOnTMDB(self):
|
||
for movie in self.movies:
|
||
tmdb_details = self._findMovieOnTMDB(movie.title)
|
||
if tmdb_details:
|
||
movie.rating = tmdb_details['vote_average']
|
||
movie.original_title = \
|
||
tmdb_details['original_title']
|
||
movie.overview = '\n '.join(textwrap.wrap(
|
||
tmdb_details['overview'], 75)
|
||
)
|
||
movie.tmdb_id = tmdb_details['id']
|
||
movie.good = \
|
||
float(movie.rating) >= self.config['minimum-rating']
|
||
movie.url = 'https://www.themoviedb.org/movie/{}?language={}' \
|
||
.format(movie.tmdb_id, self.config['tmdb-language'])
|
||
try:
|
||
movie.year = datetime.datetime.strptime(
|
||
tmdb_details['release_date'], '%Y-%m-%d'
|
||
).year
|
||
except (ValueError, KeyError):
|
||
logging.warning(
|
||
"No release date for '{!r}'".format(movie)
|
||
)
|
||
pass
|
||
else:
|
||
logging.warning(
|
||
"'{!r}' not found on TMDB!".format(movie)
|
||
)
|
||
|
||
def _findMovieOnTMDB(self, movie):
|
||
logging.info("Searching for '{}' on TMDB".format(movie))
|
||
search = tmdbsimple.Search()
|
||
search.movie(query=movie, language=self.config['tmdb-language'])
|
||
if len(search.results):
|
||
logging.info("Found '{}'".format(
|
||
search.results[0]['title']
|
||
))
|
||
return search.results[0]
|
||
else:
|
||
logging.info("'{}' not found!".format(movie))
|
||
return []
|
||
|
||
def excludeBadRatings(self):
|
||
logging.info('Dropping movies with bad ratings: {}'.format(
|
||
[m for m in self.movies if not m.good]
|
||
))
|
||
self.movies = [m for m in self.movies if m.good]
|
||
logging.debug('Kept {}'.format(self.movies))
|
||
|
||
def excludeUnavailableChannels(self):
|
||
logging.info('Dropping movies on unavailable channels: {}'.format(
|
||
[m for m in self.movies if m.channel not in self.channels]
|
||
))
|
||
self.movies = [m for m in self.movies if m.channel.lower() in self.channels]
|
||
logging.debug('Kept {}'.format(self.movies))
|
||
|
||
def excludeTelevisionMovie(self):
|
||
logging.info('Dropping television movies')
|
||
self.movies = [
|
||
m for m in self.movies if not m.genre.startswith("Téléfilm")
|
||
]
|
||
|
||
def excludeLocalMovies(self, directory):
|
||
(_, _, filenames) = next(os.walk(directory))
|
||
clean = lambda t: re.sub(r"( :|\?)", "", t)
|
||
logging.warning('Dropping movies already recorded: {}'.format(
|
||
[m for m in self.movies if clean(m.title)+'.m2ts' in filenames]
|
||
))
|
||
self.movies = [
|
||
m for m in self.movies if clean(m.title)+'.m2ts' not in filenames
|
||
]
|
||
|
||
def excludeNotSelected(self):
|
||
self.movies = [m for m in self.movies if m.user_selected]
|
||
|
||
def programMovies(self):
|
||
for movie in self.movies:
|
||
logging.debug("Programming '{!r}'".format(movie))
|
||
data = {
|
||
'channel_uuid': self.channels[movie.channel],
|
||
'start': int(movie.start_time.timestamp()),
|
||
'end': int(movie.end_time.timestamp()),
|
||
'name': movie.title,
|
||
'margin_before': 60*self.config['margin-before'],
|
||
'margin_after': 60*self.config['margin-after']
|
||
}
|
||
self.freebox.Pvr.Create_a_precord(data)
|
||
print("Programmed '{!r}'".format(movie))
|
||
|
||
def checkForConflicts(self):
|
||
programmed_movies = self.freebox.Pvr.Getting_the_list_of_precords()
|
||
if programmed_movies:
|
||
conflicting_movies = [m for m in programmed_movies if m['conflict']]
|
||
if conflicting_movies:
|
||
print(
|
||
"\n"
|
||
"!!!!!!!!!\n"
|
||
"!Warning!\n"
|
||
"!!!!!!!!!\n"
|
||
"Conflicting records detected, please "
|
||
"check your Freebox interface\n"
|
||
"http://192.168.1.254/#Fbx.os.app.pvr.app"
|
||
)
|
||
logging.info("Conflicting records detected '{}'".format(
|
||
conflicting_movies
|
||
))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
logging.basicConfig(
|
||
level=logging.WARNING,
|
||
format=' %(asctime)s - %(levelname)s - %(message)s'
|
||
)
|
||
parser = argparse.ArgumentParser(
|
||
description='Schedule movie recordings on your Freebox'
|
||
)
|
||
parser.add_argument(
|
||
'-d', '--day',
|
||
action='store_true',
|
||
help='Search movies for current day only instead of a full week'
|
||
)
|
||
parser.add_argument(
|
||
'-l', '--log',
|
||
action='store_true',
|
||
help='Display more log messages'
|
||
)
|
||
parser.add_argument(
|
||
'-b', '--debug',
|
||
action='store_true',
|
||
help='Display even more log messages'
|
||
)
|
||
parser.add_argument(
|
||
'-e', '--exclude',
|
||
action='append',
|
||
default=[],
|
||
help='Exclude the following Channel'
|
||
)
|
||
parser.add_argument(
|
||
'-x', '--exclude-directory',
|
||
action='append',
|
||
default=[],
|
||
help='''Do not display movies available in the following directory.
|
||
This will prevent you from recording the same movie multiple
|
||
times.'''
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
print("Working the magic, please wait…")
|
||
if args.debug:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
elif args.log:
|
||
logging.getLogger().setLevel(logging.INFO)
|
||
if args.day:
|
||
movies = TVGuideScraper._getMovies()
|
||
else:
|
||
movies = TVGuideScraper.findAllMovies()
|
||
|
||
fmp = FreeboxMoviePlanner(
|
||
movies,
|
||
excluded_channels=args.exclude,
|
||
excluded_directory=args.exclude_directory
|
||
)
|
||
input("Press Enter to continue...")
|