# !/usr/bin/python
# -*- coding: utf-8 -*-
"""This module provides functions for analysing graphs."""
import collections
import os
import sys
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
from loguru import logger
from tqdm.auto import tqdm
from bibliometa.config import LOGGING_FORMAT, GRAPH_ANALYSIS_CONFIG_DEFAULT
from bibliometa.configuration import BibliometaConfiguration
from bibliometa.graph.utils import load_graph, save_file
from bibliometa.utils.utils import DictUtils, MainUtils
[docs]class GraphAnalysis(BibliometaConfiguration):
"""The :class:`~bibliometa.graph.analysis.GraphAnalysis` allows to configure and run the graph analysis of
a graph corpus.
It extends the abstract :class:`~bibliometa.configuration.BibliometaConfiguration` class.
"""
def __init__(self, **kwargs):
"""Construct a new :class:`~bibliometa.graph.analysis.GraphAnalysis`.
:param **kwargs: Arbitrary keyword arguments that are used as configuration keys and values.
For example, `verbose=True` will make available a configuration key `verbose` with the value
`True` (i.e., `self.config.verbose` will then return `True`). Configuration can be set during
initialization as well as after constructing a class instance by calling the `set_config` method
on a :class:`~bibliometa.graph.analysis.GraphAnalysis` object.
"""
super().__init__(GRAPH_ANALYSIS_CONFIG_DEFAULT, **kwargs)
def _update_config(self):
"""Update configuration with class-specific values and check for configuration correctness.
:raise ValueError: If no `self.config.sim_functions` are given.
"""
self.set_config(config_id=f"{self.config.n}_{self.config.e}_{self.config.sim}_{self.config.t}")
if len(self.config.sim_functions) == 0 or len(self.config.sim) == 0:
raise ValueError("No similarity function(s) given! At least one similarity function must be provided.")
try:
if not os.path.exists(os.path.dirname(self.config.img)):
os.makedirs(os.path.dirname(self.config.img))
except Exception:
pass
try:
if not os.path.exists(os.path.dirname(self.config.graphml)):
os.makedirs(os.path.dirname(self.config.graphml))
except Exception:
pass
if self.config.create_graphml and not self.config.graphml:
raise ValueError("Configuration parameter 'graphml' must be set if parameter 'create' is set to True,"
"otherwise no GraphML file can be created!")
self._results = []
# Load graph from GraphML file, if existent; otherwise create GraphML file from similarity file
self.g = load_graph(self.config)
[docs] def start(self):
"""Start the analysis."""
# Set up logging
filename, suffix, ext = MainUtils.get_file_info(
self.config.log, suffix=self.config.name
)
logger.remove()
logger.add(filename + suffix + ext, format=LOGGING_FORMAT, level=self.config.log_level_file)
if self.config.verbose:
logger.add(sys.stderr, level=self.config.log_level_std)
self._update_config()
logger.info(f"Start graph analysis for configuration {self.config.config_id}.")
filename, suffix, ext = MainUtils.get_file_info(
self.config.o, suffix=self.config.name
)
with tqdm(total=len(self.config.types)) as progressbar:
# Calculate basic metrics
if "node_count" in self.config.types:
nodes = self._node_count()
nodes_str = f"Nodes: {nodes}"
self._results.append(nodes_str)
logger.info(nodes_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
if "edge_count" in self.config.types:
edges = self._edge_count()
edges_str = f"Edges: {edges}"
self._results.append(edges_str)
logger.info(edges_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
if "component_count" in self.config.types:
components = self._component_count()
components_str = f"Components: {components}"
self._results.append(components_str)
logger.info(components_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
if "max_component" in self.config.types:
max_component = self._max_component()
max_component_str = f"Size of largest component: {max_component}"
self._results.append(max_component_str)
logger.info(max_component_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
if "avg_degree" in self.config.types:
avg_degree = self._avg_degree()
avg_degree_str = f"Average Degree: {avg_degree}"
self._results.append(avg_degree_str)
logger.info(avg_degree_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Plot degree distribution
if "degree_distribution" in self.config.types:
logger.info("Start plotting degree distributions.")
self._degree_distribution(weight='weight')
logger.info("Finished plotting degree distributions.")
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Top degree centrality nodes
if "top_dc_nodes" in self.config.types:
logger.info("Getting top Degree Centrality nodes.")
self._top_dc_nodes()
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Degree Centrality distributions
if "degree_centrality_distribution" in self.config.types:
logger.info("Calculating Degree Centrality distributions.")
dc_values = self._degree_centrality()
avg_dc_str = f"Average degree centrality: {np.mean(list(dc_values))}"
self._results.append(avg_dc_str)
logger.info(avg_dc_str)
min_dc_str = f"Minimum degree centrality: {np.min(list(dc_values))}"
self._results.append(min_dc_str)
logger.info(min_dc_str)
max_dc_str = f"Maximum degree centrality: {np.max(list(dc_values))}"
self._results.append(max_dc_str)
logger.info(max_dc_str)
stdev_dc_str = f"Stdev degree centrality: {np.std(list(dc_values))}"
self._results.append(stdev_dc_str)
logger.info(stdev_dc_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Local cluster coefficient
if "local_cluster_coefficient" in self.config.types:
logger.info("Calculating local cluster coefficients.")
if self.config.weighted:
lcc_values = self._local_cluster_coefficient(weight='weight')
else:
lcc_values = self._local_cluster_coefficient()
avg_lcc_str = f"Average local cluster coefficient: {np.mean(list(lcc_values))}"
self._results.append(avg_lcc_str)
logger.info(avg_lcc_str)
min_lcc_str = f"Minimum local cluster coefficient: {np.min(list(lcc_values))}"
self._results.append(min_lcc_str)
logger.info(min_lcc_str)
max_lcc_str = f"Maximum local cluster coefficient: {np.max(list(lcc_values))}"
self._results.append(max_lcc_str)
logger.info(max_lcc_str)
stdev_lcc_str = f"Stdev local cluster coefficient: {np.std(list(lcc_values))}"
self._results.append(stdev_lcc_str)
logger.info(stdev_lcc_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Density
if "density" in self.config.types:
density_str = f"Density: {self._density()}"
self._results.append(density_str)
logger.info(density_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Diameter
if "diameter" in self.config.types:
diameter_str = f"Diameter: {self._diameter()}"
self._results.append(diameter_str)
logger.info(diameter_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Average shortest path
if "average_shortest_path" in self.config.types:
avg_shortest_path_str = f"Average shortest path: {self._avg_shortest_path()}"
self._results.append(avg_shortest_path_str)
logger.info(avg_shortest_path_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Global clustering coefficient
if "global_clustering_coefficient" in self.config.types:
global_cc_str = f"Global clustering coefficient: {self._global_cluster_coefficient()}"
self._results.append(global_cc_str)
logger.info(global_cc_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Graph clique number
if "graph_clique_number" in self.config.types:
clique_number_str = f"Graph clique number: {self._clique_number()}"
self._results.append(clique_number_str)
logger.info(clique_number_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
# Graph number of cliques
if "number_of_cliques" in self.config.types:
nr_of_cliques_str = f"Number of cliques: {self._nr_of_cliques()}"
self._results.append(nr_of_cliques_str)
logger.info(nr_of_cliques_str)
save_file(filename + suffix + ext, self._results)
progressbar.update()
def _node_count(self):
"""Return node count.
:return: Node count
:rtype: `int`
"""
return len(list(self.g.nodes))
def _edge_count(self):
"""Return edge count.
:return: Edge count
:rtype: `int`
"""
return len(list(self.g.edges))
def _component_count(self):
"""Return component count.
:return: Component count
:rtype: `int`
"""
return len(list(nx.connected_components(self.g)))
def _max_component(self, return_component=False):
"""Return largest component size or largest component.
:param return_component: Whether component should be returned instead of only the size
:type return_component: `bool`
:return: Largest component size or largest component itself
:rtype: `set` or `int`
"""
if return_component:
return max(nx.connected_components(self.g), key=len)
else:
return len(list(max(nx.connected_components(self.g), key=len)))
def _avg_degree(self):
"""Return average node degree.
:return: Average node degree
:rtype: `float`
"""
return float(self.g.size()) / self.g.order()
def _degree_distribution(self, weight=None):
"""Calculate, plot and save degree distributions.
:param weight: Edge attribute that contains weight information
:type weight: `str`
"""
degreeview = self.g.degree(weight=weight)
degree_sequence = sorted([d for n, d in degreeview], reverse=True)
degree_count = collections.Counter(degree_sequence)
deg, cnt = zip(*degree_count.items())
deg = [int(x) for x in deg] # convert to integer values
plt.rc('axes', titlesize=14)
plt.rc('axes', labelsize=14)
plt.rc('xtick', labelsize=14)
plt.rc('ytick', labelsize=14)
# Relative density
fig, ax = plt.subplots(figsize=(10, 6))
ax.hist(degree_sequence, bins=range(0, int(max(degree_sequence))),
density=True, histtype='bar', rwidth=0.8, color="orange")
ax.set_yscale('log')
ax.set_xscale('log')
ax.set_xlabel('Knotengrad')
ax.set_ylabel('Relative Dichte')
filename, suffix, ext = MainUtils.get_file_info(
self.config.img + self.config.config_id + "_degree_density.png", suffix=self.config.name
)
plt.savefig(filename + suffix + ext)
if self.config.verbose:
plt.show()
plt.close()
# Histogram
fig, ax = plt.subplots(figsize=(10, 6))
ax.bar(deg, cnt, color="orange")
ax.set_ylabel("Anzahl an Knoten")
ax.set_xlabel("Knotengrad")
ax.set_xticks([d + 0.4 for d in deg])
ax.set_xticklabels(deg)
filename, suffix, ext = MainUtils.get_file_info(
self.config.img + self.config.config_id + "_degree_histogram.png", suffix=self.config.name
)
plt.savefig(filename + suffix + ext)
if self.config.verbose:
plt.show()
plt.close()
# Degree distribution logarithmic scale
fig, ax = plt.subplots(figsize=(10, 6))
ax.grid(True)
ax.set_xlabel('Knotengrad')
ax.set_ylabel('Anzahl an Knoten')
ax.set_xscale("log")
ax.set_yscale("log")
ax.plot(deg, cnt, 'o', color="orange")
filename, suffix, ext = MainUtils.get_file_info(
self.config.img + self.config.config_id + "_degree_distribution_log-log.png", suffix=self.config.name
)
plt.savefig(filename + suffix + ext)
if self.config.verbose:
plt.show()
plt.close()
# Degree distribution (normal scale)
fig, ax = plt.subplots(figsize=(10, 6))
ax.grid(True)
# Count value occurrence
_values = {}
for d in degreeview:
if d[1] in _values.keys():
_values[d[1]] += 1
else:
_values[d[1]] = 1
# Create histogram
values = sorted(set(_values.keys()))
hist = [_values[x] for x in values]
ax.plot(values, hist, 'o', color='orange')
ax.set_xlabel('Knotengrad')
ax.set_ylabel('Anzahl an Knoten')
filename, suffix, ext = MainUtils.get_file_info(
self.config.img + self.config.config_id + "_degree_distribution.png", suffix=self.config.name
)
plt.savefig(filename + suffix + ext)
if self.config.verbose:
plt.show()
plt.close()
def _top_dc_nodes(self, k=5):
""""Get top k Degree Centrality nodes.
:param k: Number of nodes that will be collected
:type k: `int`
"""
top_dc = DictUtils.get_top_keys(nx.degree_centrality(self.g), 100)
nodes = []
count = 0
for node_id in top_dc:
logger.info(f"Top {count} Degree Centrality node: {self.g.degree[node_id]}, {node_id}")
nodes.append((self.g.degree[node_id], node_id))
count += 1
if count == k:
break
self._results.append(f"Top degree centrality nodes: {nodes}")
filename, suffix, ext = MainUtils.get_file_info(
self.config.o, suffix=self.config.name
)
save_file(filename + suffix + ext, self._results)
def _degree_centrality(self):
"""Calculate, plot and save Degree Centrality distributions.
:return: Degree Centralities and their occurrence
:rtype: `tuple`
"""
sequence = sorted([round(d, 4) for n, d in nx.degree_centrality(self.g).items()], reverse=True)
count = collections.Counter(sequence)
dc, _ = zip(*count.items())
plt.xlabel('Degree Centrality')
plt.ylabel('Relative Häufigkeit')
hist, bins = np.histogram(dc, bins=len(dc) if len(dc) < 40 else 40, density=True)
# Normalize y axis to [0,1]
plt.bar(bins[:-1], hist.astype(np.float32) / hist.sum(), width=(bins[1] - bins[0]) * 0.8, color='orange')
filename, suffix, ext = MainUtils.get_file_info(
self.config.img + self.config.config_id + "_degree_centrality.png", suffix=self.config.name
)
plt.savefig(filename + suffix + ext)
if self.config.verbose:
plt.show()
plt.close()
return dc
def _local_cluster_coefficient(self, weight=None):
"""Calculate, plot and save local cluster coefficients.
:param weight: Edge attribute that contains weight information
:type weight: `str`
:return: Local cluster coefficients and their occurrence
:rtype: `tuple`
"""
sequence = sorted([round(d, 4) for n, d in nx.algorithms.cluster.clustering(self.g, weight=weight).items()],
reverse=True)
count = collections.Counter(sequence)
cc, _ = zip(*count.items())
plt.xlabel('Lokaler Cluster-Koeffizient')
plt.ylabel('Relative Häufigkeit')
hist, bins = np.histogram(cc, bins=len(cc) if len(cc) < 40 else 40, density=True)
# Normalize y axis to [0,1]
plt.bar(bins[:-1], hist.astype(np.float32) / hist.sum(), width=(bins[1] - bins[0]) * 0.8, color='orange')
filename, suffix, ext = MainUtils.get_file_info(
self.config.img + self.config.config_id + "_local_cluster_coefficient.png", suffix=self.config.name
)
plt.savefig(filename + suffix + ext)
if self.config.verbose:
plt.show()
plt.close()
return cc
def _density(self):
"""Return graph density.
:return: Graph density
:rtype: `float`
"""
return nx.density(self.g)
def _diameter(self):
"""Return graph diameter.
:return: Graph diameter
:rtype: `int`
"""
if nx.is_connected(self.g):
return nx.algorithms.diameter(self.g)
else:
return nx.algorithms.diameter(self.g.subgraph(self._max_component(return_component=True)))
def _avg_shortest_path(self):
"""Return average shortest path.
:return: Average shortest path
:rtype: `float`
"""
if nx.is_connected(self.g):
return nx.average_shortest_path_length(self.g, weight='weight')
else:
return nx.average_shortest_path_length(self.g.subgraph(self._max_component(return_component=True)),
weight='weight')
def _global_cluster_coefficient(self):
"""Return global cluster coefficient.
:return: Global cluster coefficient
:rtype: `float`
"""
return nx.average_clustering(self.g)
def _clique_number(self):
"""Return clique number (i.e., size of largest clique).
:return: Size of largest clique
:rtype: `int`
"""
return nx.graph_clique_number(self.g)
def _nr_of_cliques(self):
"""Return number of cliques.
:return: Number of cliques
:rtype: `int`
"""
return nx.graph_number_of_cliques(self.g)