Source code for bibliometa.graph.similarity

# !/usr/bin/python
# -*- coding: utf-8 -*-

"""This module provides a class for similarity function definitions and similarity calculations."""

import csv
import itertools
from collections import OrderedDict

from loguru import logger
from scipy.special import binom
from tqdm.auto import tqdm

from bibliometa.utils.utils import MainUtils


[docs]class Similarity: """The :class:`~bibliometa.graph.similarity.Similarity` provides functions to define and calculate different types of similarity. """
[docs] @staticmethod def calculate(corpus, config): """Calculate similarity between data sets. :param corpus: Graph corpus that contains the data on which similarity calculation will be based :type corpus: `dict` :param config: Configuration object :type config: `bibliometa.configuration.Config` """ results = [] record_count = 0 # calculate size of progress bar combinations = binom(len(list(corpus.keys())), 2) factor = MainUtils.get_factor(combinations) progress_max_value = int(combinations / factor) # Calculate similarity for each node--node pair in the corpus with tqdm(total=progress_max_value) as progressbar: for key1, key2 in itertools.combinations(corpus, 2): similarity_dict = Similarity._get_similarity( set(corpus[key1]), set(corpus[key2]), config.sim_functions ) # consider this combination only if at least one similarity function returned a value > 0 if any(x > 0 for x in list(similarity_dict.values())): _tmp_results = [record_count, key1, key2] + [similarity_dict[s] for s in similarity_dict.keys()] results.append(_tmp_results) # get progress record_count += 1 if record_count % factor == 0: progressbar.update() progressbar.close() # TODO: implement chunks for larger data sets? If yes, also implement saving temporary results/progress progress = 1.0 Similarity._write_results_and_progress(results, config, progress)
@staticmethod def _get_similarity(a, b, sim_functions): """Get similarity between a and b for different similarity functions. Get arguments that need to be passed to similarity functions from configuration, where necessary. :param a: Set of values for item a :type a: `set` :param b: Set of values for item b :type b: `set` :param sim_functions: Similarity functions :type sim_functions: `list` of `dict` :return: OrderedDict with similarity functions as keys and their values :rtype: `OrderedDict` """ d = OrderedDict() # default arguments for similarity functions f = None # function that calculates similarity value t = 0 # threshold for similarity value # iterate over similarity functions, get arguments and call the function for sim_func in sim_functions: if "args" in sim_func.keys(): if "f" in sim_func["args"]: f = sim_func["args"]["f"] if "t" in sim_func["args"]: t = sim_func["args"]["t"] d[sim_func["name"]] = sim_func["function"]( a, b, f, t ) return d @staticmethod def _write_results_and_progress(results, config, progress): """Write (temporary) results and progress to the appropriate files. :param results: List of similarity results for node-node pairs :type results: `list` :param config: Configuration object :type config: `bibliometa.configuration.Config` :param progress: Progress values :type progress: `float` """ filename, suffix, ext = MainUtils.get_file_info(config.o, suffix=config.name) with open(filename + suffix + ext, "w", newline="") as f: wr = csv.writer(f, delimiter=config.csv_sep) wr.writerows(results) logger.info(f"(Temporary) Results written to file {filename + suffix + ext}.") filename, suffix, ext = MainUtils.get_file_info(config.log, suffix=config.name) with open(filename + suffix + "_progress" + ext, "w", newline="") as f: f.write(str(progress)) logger.info(f"Progress written to file {filename + suffix + ext}.")
[docs] class Functions: """This class contains predefined similarity functions. """
[docs] @staticmethod def mint(a, b, f, t=0): """a and b are considered similar if the size of their intersection is greater than or equal to t. :param a: Set of values for item a :type a: `set` :param b: Set of values for item b :type b: `set` :param f: This value (or the result of this function) will be returned if similarity between a and b >= t :type f: function or `int` :param t: Threshold :type t: `int` :return: Similarity value :rtype: `float` or `int` :raise ValueError: If f is neither a function nor an `int` or `float` """ if callable(f): return f(a, b) if len(list(a.intersection(b))) >= t else 0 elif isinstance(f, int) or isinstance(f, float): return f if len(list(a.intersection(b))) >= t else 0 else: raise ValueError("Parameter 'f' is neither function nor int or float!")
[docs] @staticmethod def jaccard(a, b, f, t=0): """The Jaccard Index. a and b are considered similar if the size of their intersection divided by their union is greater than or equal to t. :param a: Set of values for item a :type a: `set` :param b: Set of values for item b :type b: `set` :param f: This value (or the result of this function) will be returned if similarity between a and b >= t :type f: function or `int` :param t: Threshold :type t: `int` :return: Similarity value :rtype: `float` or `int` :raise ValueError: If f is neither a function nor an `int` or `float` """ intersection = len(list(a.intersection(b))) union = (len(list(a)) + len(list(b))) - intersection jacc = float(intersection) / union if callable(f): return f(jacc) if union > t else 0 elif isinstance(f, int) or isinstance(f, float): return jacc if union > t else 0 else: raise ValueError("Parameter 'f' is neither function nor int or float!")
[docs] @staticmethod def overlap(a, b, f, t=0): """The overlap score. a and b are considered similar if the size of their intersection divided by the minimum set length of a and b is greater than or equal to t. :param a: Set of values for item a :type a: `set` :param b: Set of values for item b :type b: `set` :param f: This value (or the result of this function) will be returned if similarity between a and b >= t :type f: function or `int` :param t: Threshold :type t: `int` :return: Similarity value :rtype: `float` or `int` :raise ValueError: If f is neither a function nor an `int` or `float` """ intersection = len(list(a.intersection(b))) ovlp = float(intersection) / min(len(list(a)), len(list(b))) if callable(f): return f(ovlp) if min(len(list(a)), len(list(b))) > t else 0 elif isinstance(f, int) or isinstance(f, float): return ovlp if min(len(list(a)), len(list(b))) > t else 0 else: raise ValueError("Parameter 'f' is neither function nor int or float!")