# coding: utf-8 """ This module contains all the code related to fetching and loading flats lists. """ from __future__ import absolute_import, print_function, unicode_literals from builtins import str import arrow import collections import datetime import itertools import json import logging from ratelimit import limits from flatisfy import database from flatisfy import tools from flatisfy.constants import BACKENDS_BY_PRECEDENCE from flatisfy.models import flat as flat_model from flatisfy.models import last_fetch as last_fetch_model LOGGER = logging.getLogger(__name__) try: from woob.capabilities.housing import Query, POSTS_TYPES, HOUSE_TYPES from woob.core.bcall import CallErrors from woob.core.ouiboube import WebNip from woob.tools.json import WoobEncoder except ImportError: LOGGER.error("Woob is not available on your system. Make sure you installed it.") raise class WoobProxy(object): """ Wrapper around Woob ``WebNip`` class, to fetch housing posts without having to spawn a subprocess. """ @staticmethod def version(): """ Get Woob version. :return: The installed Woob version. """ return WebNip.VERSION @staticmethod def restore_decimal_fields(flat): """ Parse fields expected to be in Decimal type to float. They were dumped as str in the JSON dump process. :param flat: A flat dict. :return: A flat dict with Decimal fields converted to float. """ for field in ["area", "cost", "rooms", "bedrooms", "price_per_meter"]: try: flat[field] = float(flat[field]) except (TypeError, ValueError): flat[field] = None except KeyError: pass return flat def __init__(self, config): """ Create a Woob handle and try to load the modules. :param config: A config dict. """ # Default backends if not config["backends"]: backends = BACKENDS_BY_PRECEDENCE else: backends = config["backends"] # Create base WebNip object self.webnip = WebNip(modules_path=config["modules_path"]) # Create backends self.backends = [] for module in backends: try: self.backends.append( self.webnip.load_backend(module, module, params={}) ) except Exception as exc: raise Exception('Unable to load module ' + module) from exc def __enter__(self): return self def __exit__(self, *args): self.webnip.deinit() def build_queries(self, constraints_dict): """ Build Woob ``woob.capabilities.housing.Query`` objects from the constraints defined in the configuration. Each query has at most 3 cities, to comply with housing websites limitations. :param constraints_dict: A dictionary of constraints, as defined in the config. :return: A list of Woob ``woob.capabilities.housing.Query`` objects. Returns ``None`` if an error occurred. """ queries = [] # First, find all matching cities for the postal codes in constraints matching_cities = [] for postal_code in constraints_dict["postal_codes"]: try: for city in self.webnip.do("search_city", postal_code): matching_cities.append(city) except CallErrors as exc: # If an error occured, just log it LOGGER.error( ("An error occured while building query for postal code %s: %s"), postal_code, str(exc), ) if not matching_cities: # If postal code gave no match, warn the user LOGGER.warn("Postal code %s could not be matched with a city.", postal_code) # Remove "TOUTES COMMUNES" entry which are duplicates of the individual # cities entries in Logicimmo module. matching_cities = [ city for city in matching_cities if not (city.backend == "logicimmo" and city.name.startswith("TOUTES COMMUNES")) ] # Then, build queries by grouping cities by at most 3 for cities_batch in tools.batch(matching_cities, 3): query = Query() query.cities = list(cities_batch) try: query.house_types = [ getattr(HOUSE_TYPES, house_type.upper()) for house_type in constraints_dict["house_types"] ] except AttributeError: LOGGER.error("Invalid house types constraint.") return None try: query.type = getattr(POSTS_TYPES, constraints_dict["type"].upper()) except AttributeError: LOGGER.error("Invalid post type constraint.") return None query.area_min = constraints_dict["area"][0] query.area_max = constraints_dict["area"][1] query.cost_min = constraints_dict["cost"][0] query.cost_max = constraints_dict["cost"][1] query.nb_rooms = constraints_dict["rooms"][0] queries.append(query) return queries def query( self, query, max_entries=None, store_personal_data=False, force_fetch_all=False, last_fetch_by_backend=None ): """ Fetch the housings posts matching a given Woob query. :param query: A Woob `woob.capabilities.housing.Query`` object. :param max_entries: Maximum number of entries to fetch. :param store_personal_data: Whether personal data should be fetched from housing posts (phone number etc). :param force_fetch_all: Whether to force fetching all available flats or only diff from last fetch (based on timestamps). :param last_fetch_by_backend: A dict mapping all backends to last fetch datetimes. :return: The matching housing posts, dumped as a list of JSON objects. """ if last_fetch_by_backend is None: last_fetch_by_backend = {} housings = [] # List the useful backends for this specific query useful_backends = [x.backend for x in query.cities] try: for housing in itertools.islice( self.webnip.do( "search_housings", query, # Only run the call on the required backends. # Otherwise, Woob is doing weird stuff and returning # nonsense. backends=[x for x in self.backends if x.name in useful_backends], ), max_entries, ): if not force_fetch_all: # Check whether we should continue iterating or not last_fetch_datetime = last_fetch_by_backend.get(housing.backend) if last_fetch_datetime and housing.date and housing.date < last_fetch_datetime: LOGGER.info( 'Done iterating till last fetch (housing.date=%s, last_fetch=%s). Stopping iteration.', housing.date, last_fetch_datetime ) break if not store_personal_data: housing.phone = None housings.append(json.dumps(housing, cls=WoobEncoder)) except CallErrors as exc: # If an error occured, just log it LOGGER.error("An error occured while fetching the housing posts: %s", str(exc)) return housings def info(self, full_flat_id, store_personal_data=False): """ Get information (details) about an housing post. :param full_flat_id: A Woob housing post id, in complete form (ID@BACKEND) :param store_personal_data: Whether personal data should be fetched from housing posts (phone number etc). :return: The details in JSON. """ flat_id, backend_name = full_flat_id.rsplit("@", 1) try: backend = next(backend for backend in self.backends if backend.name == backend_name) except StopIteration: LOGGER.error("Backend %s is not available.", backend_name) return "{}" try: housing = backend.get_housing(flat_id) if not store_personal_data: # Ensure phone is cleared housing.phone = None else: # Ensure phone is fetched backend.fillobj(housing, "phone") # Otherwise, we miss the @backend afterwards housing.id = full_flat_id return json.dumps(housing, cls=WoobEncoder) except Exception as exc: # pylint: disable=broad-except # If an error occured, just log it LOGGER.error("An error occured while fetching housing %s: %s", full_flat_id, str(exc)) return "{}" def fetch_flats(config): """ Fetch the available flats using the Woob config. :param config: A config dict. :return: A dict mapping constraint in config to all available matching flats. """ fetched_flats = {} # Get last fetch datetimes for all constraints / backends get_session = database.init_db(config["database"], config["search_index"]) with get_session() as session: last_fetch = collections.defaultdict(dict) for item in session.query(last_fetch_model.LastFetch).all(): last_fetch[item.constraint_name][item.backend] = item.last_fetch # Do the actual fetching for constraint_name, constraint in config["constraints"].items(): LOGGER.info("Loading flats for constraint %s...", constraint_name) with WoobProxy(config) as woob_proxy: queries = woob_proxy.build_queries(constraint) housing_posts = [] for query in queries: housing_posts.extend( woob_proxy.query( query, config["max_entries"], config["store_personal_data"], config["force_fetch_all"], last_fetch[constraint_name] ) ) housing_posts = [json.loads(flat) for flat in housing_posts] # Update last_fetch last_fetch_by_backends = collections.defaultdict(lambda: None) for flat in housing_posts: backend = flat['id'].split('@')[-1] if ( last_fetch_by_backends[backend] is None or last_fetch_by_backends[backend] < flat['date'] ): last_fetch_by_backends[backend] = flat['date'] for backend in last_fetch_by_backends: last_fetch_in_db = session.query(last_fetch_model.LastFetch).where( last_fetch_model.LastFetch.constraint_name == constraint_name, last_fetch_model.LastFetch.backend == backend ).first() if last_fetch_in_db: last_fetch_in_db.last_fetch = arrow.get( last_fetch_by_backends[backend] ).date() elif last_fetch_by_backends[backend]: last_fetch_in_db = last_fetch_model.LastFetch( constraint_name=constraint_name, backend=backend, last_fetch=arrow.get(last_fetch_by_backends[backend]).date() ) session.add(last_fetch_in_db) session.commit() housing_posts = housing_posts[: config["max_entries"]] LOGGER.info("Fetched %d flats.", len(housing_posts)) constraint_flats_list = [WoobProxy.restore_decimal_fields(flat) for flat in housing_posts] fetched_flats[constraint_name] = constraint_flats_list return fetched_flats @limits(calls=10, period=60) def fetch_details_rate_limited(config, flat_id): """ Limit flats fetching to at most 10 calls per minute to avoid rate banning """ return fetch_details(config, flat_id) def fetch_details(config, flat_id): """ Fetch the additional details for a flat using Woob. :param config: A config dict. :param flat_id: ID of the flat to fetch details for. :return: A flat dict with all the available data. """ with WoobProxy(config) as woob_proxy: LOGGER.info("Loading additional details for flat %s.", flat_id) woob_output = woob_proxy.info(flat_id, config["store_personal_data"]) flat_details = json.loads(woob_output) flat_details = WoobProxy.restore_decimal_fields(flat_details) LOGGER.info("Fetched details for flat %s.", flat_id) return flat_details def load_flats_from_file(json_file, config): """ Load a dumped flats list from JSON file. :param json_file: The file to load housings list from. :return: A dict mapping constraint in config to all available matching flats. .. note:: As we do not know which constraint is met by a given flat, all the flats are returned for any available constraint, and they will be filtered out afterwards. """ flats_list = [] try: LOGGER.info("Loading flats list from file %s", json_file) with open(json_file, "r") as fh: flats_list = json.load(fh) LOGGER.info("Found %d flats.", len(flats_list)) except (IOError, ValueError): LOGGER.error("File %s is not a valid dump file.", json_file) return {constraint_name: flats_list for constraint_name in config["constraints"]} def load_flats_from_db(config): """ Load flats from database. :param config: A config dict. :return: A dict mapping constraint in config to all available matching flats. """ get_session = database.init_db(config["database"], config["search_index"]) loaded_flats = collections.defaultdict(list) with get_session() as session: for flat in session.query(flat_model.Flat).all(): loaded_flats[flat.flatisfy_constraint].append(flat.json_api_repr()) return loaded_flats