Source code for ndex2.nice_cx_network

__author__ = 'aarongary'

import sys
import pandas as pd
import networkx as nx
import io
import decimal
import numpy as np
import json
import ijson
import requests
import base64
import logging

from ndex2.client import Ndex2
from ndex2.exceptions import NDExError
from ndex2.exceptions import NDExNotFoundError
from ndex2.exceptions import NDExUnauthorizedError
from ndex2.exceptions import NDExInvalidParameterError

from ndex2 import constants
from ndex2.util import PandasDataConverter

if sys.version_info.major == 3:
    from urllib.request import urlopen, Request, HTTPError, URLError
else:
    from urllib2 import urlopen, Request, HTTPError, URLError


[docs] class NiceCXNetwork: VISUAL_PROPERTIES = 'visualProperties' CY_VISUAL_PROPERTIES = 'cyVisualProperties' PROPERTIES_OF = 'properties_of' PROPS_OF_NODES = 'nodes' PROPS_OF_EDGES = 'edges' META_DATA = 'metaData' def __init__(self, **attr): self.metadata = {} self.nodes = {} self.node_int_id_generator = 0 self.edge_int_id_generator = 0 self.node_id_lookup = [] self.edges = {} self.citations = {} self.nodeCitations = {} self.edgeCitations = {} self.edgeSupports = {} self.nodeSupports = {} self.supports = {} self.nodeAttributes = {} self.edgeAttributes = {} self.edgeAttributeHeader = set([]) self.nodeAttributeHeader = set([]) self.networkAttributes = [] self.nodeAssociatedAspects = {} self.edgeAssociatedAspects = {} self.opaqueAspects = {} self.provenance = [] self.missingNodes = {} self.s = None self.node_name_to_id_map_cache = {} self.logger = logging.getLogger(__name__) @staticmethod def _is_python_three_or_greater(): """ Using ``sys.version_info.major`` determines version of Python :return: True if Python 3 or greater is found, otherwise False :rtype: bool """ if sys.version_info.major >= 3: return True return False def __create_edge(self, edge_id=None, edge_source=None, edge_target=None, edge_interaction=None): """ Create a new edge in the network by specifying source-interaction-target :param id: :type id: :param edge_source: The source node this edge, either its id or the node object itself. :type edge_source: int :param edge_target: The target node this edge, either its id or the node object itself. :type edge_target: int :param edge_interaction: The interaction that describes the relationship between the source and target nodes :type edge_interaction: string :param cx_fragment: CX Fragment :type cx_fragment: json :return: Edge ID :rtype: int """ if isinstance(edge_source, dict): src_id = edge_source.get(constants.EDGE_ID) else: src_id = edge_source if isinstance(edge_target, dict): target_id = edge_target.get(constants.EDGE_ID) else: target_id = edge_target self.edges[edge_id] = {constants.EDGE_ID: edge_id, constants.EDGE_SOURCE: src_id, constants.EDGE_TARGET: target_id} if edge_interaction is not None: self.edges[edge_id][constants.EDGE_INTERACTION] = edge_interaction return edge_id
[docs] def create_edge(self, edge_source=None, edge_target=None, edge_interaction=None): """ Create a new edge in the network by specifying source-interaction-target .. warning:: Version `3.3.1` and prior of this library had a `bug <https://github.com/ndexbio/ndex2-client/issues/60>`__ that caused this method to behave incorrectly. Please upgrade to `3.3.2 <https://pypi.org/project/ndex2/3.3.2/>`__ or greater. Example: ``my_edge = create_edge(edge_source=my_node, edge_target=my_node2, edge_interaction='up-regulates')`` :param edge_source: The source node of this edge, either its id or the node object itself. :type edge_source: int, dict (with :py:const:`~ndex2.constants.EDGE_ID` property) :param edge_target: The target node of this edge, either its id or the node object itself. :type edge_target: int, dict (with :py:const:`~ndex2.constants.EDGE_ID` property) :param edge_interaction: The interaction that describes the relationship between the source and target nodes :type edge_interaction: string :return: Edge ID :rtype: int """ edge_id = self.edge_int_id_generator self.__create_edge(edge_id=edge_id, edge_source=edge_source, edge_target=edge_target, edge_interaction=edge_interaction) self.edge_int_id_generator += 1 return edge_id
# ================== # NODE OPERATIONS # ================== def __create_node(self, node_id=None, node_name=None, node_represents=None): if node_id is None: node_id = self.get_next_node_id() if node_represents is not None: self.nodes[node_id] = {constants.NODE_ID: node_id, constants.NODE_NAME: node_name, constants.NODE_REPRESENTS: node_represents} else: self.nodes[node_id] = {constants.NODE_ID: node_id, constants.NODE_NAME: node_name, constants.NODE_REPRESENTS: node_name} return id
[docs] def create_node(self, node_name=None, node_represents=None): """ Creates a new node with the corresponding name and represents (external id) .. warning:: Version `3.3.1` and prior of this library had a `bug <https://github.com/ndexbio/ndex2-client/issues/60>`__ that caused this method to behave incorrectly. Please upgrade to `3.3.2 <https://pypi.org/project/ndex2/3.3.2/>`__ or greater. Example: ``my_node = create_node(node_name='MAPK1, node_represents='1114208')`` :param node_name: Name of the node :type node_name: str :param node_represents: Representation of the node (alternate identifier) :type node_represents: str :return: Node ID :rtype: int """ node_id = self.node_int_id_generator self.__create_node(node_id=node_id, node_name=node_name, node_represents=node_represents) self.node_int_id_generator += 1 return node_id
def add_network_attribute(self, name=None, values=None, type=None, subnetwork=None): """ Add an attribute to the network :param name: Name of the attribute :type name: str :param values: The value(s) of the attribute :type values: One of the allowable CX types. See `Supported data types`_ :param type: They type of data supplied in values. Default is string. See `Supported data types`_ :type type: str :return: None :rtype: None """ found_attr = False for n_a in self.networkAttributes: if n_a.get('n') == name: n_a['v'] = values if 'd' in n_a: del n_a['d'] if type is not None: n_a['d'] = type found_attr = True break if not found_attr: if type is not None: network_attribute = {'n': name, 'v': values, 'd': type} else: network_attribute = {'n': name, 'v': values} self.networkAttributes.append(network_attribute) def add_citation(self, id, title=None, contributor=None, identifier=None, type=None, description=None, attributes=None): add_this_citation = {'@id': id} if contributor is not None: add_this_citation['dc:contributor'] = contributor if identifier is not None: add_this_citation['dc:identifier'] = identifier if type is not None: add_this_citation['dc:type'] = type if title is not None: add_this_citation['dc:title'] = title if description is not None: add_this_citation['dc:description'] = description if attributes is not None: add_this_citation[attributes] = attributes self.citations[id] = add_this_citation return add_this_citation def add_edge_citations(self, edge_id, citation): if isinstance(citation, dict): edge_citation_element = {'po': [edge_id], 'citations': [citation.get('@id')]} else: edge_citation_element = {'po': [edge_id], 'citations': [citation]} self.build_many_to_many_relation('edgeCitations', edge_citation_element, 'citations') def add_support(self, id=None, text=None, citation_id=None, attributes=None, props=None): add_this_supports = {'@id': id} if text is not None: add_this_supports['text'] = text if citation_id is not None: add_this_supports['citation'] = citation_id if attributes is not None and len(attributes) > 0: add_this_supports['attributes'] = attributes if props is not None and len(props) > 0: add_this_supports['properties'] = props self.supports[id] = add_this_supports return add_this_supports def add_edge_supports(self, edge_id, support): if isinstance(support, dict): edge_support_element = {'po': [edge_id], 'supports': [support.get('@id')]} else: edge_support_element = {'po': [edge_id], 'supports': [support]} self.build_many_to_many_relation('edgeSupports', edge_support_element, 'supports') def build_many_to_many_relation(self, aspect_name, element, relation_name): if aspect_name == 'nodeCitations': aspect = self.nodeCitations elif aspect_name == 'edgeCitations': aspect = self.edgeCitations elif aspect_name == 'edgeSupports': aspect = self.edgeSupports else: raise Exception('Only nodeCitations, edgeCitations and ' 'edgeSupports are supported. ' + aspect_name + ' was supplied') for po in element.get('po'): po_id = aspect.get(po) if po_id is None: aspect[po] = element.get(relation_name) else: aspect[po] += element.get(relation_name) # TODO # make opaque aspect into a one shot method to set the whole aspect. # i.e. not one element at a time def add_opaque_aspect(self, aspect_name, aspect): if isinstance(aspect, list): self.opaqueAspects[aspect_name] = aspect elif isinstance(aspect, dict): if 'error' in aspect: pass else: self.opaqueAspects[aspect_name] = [aspect] else: raise Exception('Provided input was not of type list.') def add_opaque_aspect_element(self, opaque_element): raise Exception('add_opaque_aspect_element() is deprecated. ' 'Please use add_opaque_aspect()')
[docs] def set_name(self, network_name): """ Set the network name Example: ``set_name('P38 Signaling')`` :param network_name: Network name :type network_name: string :return: None :rtype: None """ self.add_network_attribute(name='name', values=network_name, type='string')
[docs] def get_name(self): """ Get the network name :return: Network name :rtype: string """ for net_a in self.networkAttributes: if net_a.get('n') == 'name': return net_a.get('v') return None
def add_name_space(self, prefix, uri): found_context = False for n_a in self.networkAttributes: if n_a.get('n') == '@context': found_context = True add_to_this_context = json.loads(n_a['v']) add_to_this_context[prefix] = uri n_a['v'] = json.dumps(add_to_this_context) break if not found_context: self.add_network_attribute(name='@context', values=json.dumps({prefix: uri}), type='string') def set_namespaces(self, ns): self.set_context(ns) def get_namespaces(self,): return self.get_context()
[docs] def get_edges(self): """ Returns an iterator over edge ids as keys and edge objects as values. Example: ``for edge_id, edge_obj in nice_cx.get_edges():`` ``print(edge_obj.get('i')) # print interaction`` ``print(edge_obj.get('s')) # print source node id`` :return: Edge iterator :rtype: iterator """ if NiceCXNetwork._is_python_three_or_greater(): return self.edges.items() return self.edges.iteritems()
def get_edge(self, edge): return self.edges.get(edge) # ============================== # NETWORK PROPERTY OPERATIONS # ==============================
[docs] def get_network_attribute(self, attribute_name): """ Get the value of a network attribute :param attribute_name: Attribute name :type attribute_name: string :return: Network attribute object :rtype: dict """ for n_a in self.networkAttributes: if n_a.get('n') == attribute_name: return n_a return None
[docs] def get_network_attribute_names(self): """ Creates a a generator that gets network attribute names. :return: attribute name via a generator :rtype: string """ for n_a in self.networkAttributes: if constants.NET_ATTR_NAME not in n_a: continue yield n_a.get(constants.NET_ATTR_NAME)
def get_next_node_id(self): return_id = self.node_int_id_generator self.node_int_id_generator += 1 return return_id def add_node_attribute(self, property_of=None, name=None, values=None, type=None, subnetwork=None, overwrite=False): if property_of is None: raise NDExError('Node attribute requires property_of') if isinstance(property_of, dict): node_id = property_of.get(constants.NODE_ID) if node_id is None: raise NDExError('No id found in Node') else: node_id = property_of if name is None or values is None: raise NDExError('Node attribute requires the name and values property') if self.nodeAttributes.get(node_id) is None: self.nodeAttributes[node_id] = [] if overwrite is True: for index, val in enumerate(self.nodeAttributes[node_id]): if val[constants.NODE_ATTR_NAME] == name: del self.nodeAttributes[node_id][index] n_attrib = {constants.NODE_ATTR_PROPERTYOF: node_id, constants.NODE_ATTR_NAME: name, constants.NODE_ATTR_VALUE: values} if type is None: attr_type = None if isinstance(values, float): attr_type = 'double' elif isinstance(values, int): attr_type = 'integer' elif isinstance(values, list): attr_type = 'list_of_string' if attr_type: n_attrib[constants.NODE_ATTR_DATATYPE] = attr_type else: n_attrib[constants.NODE_ATTR_DATATYPE] = type self.nodeAttributes[node_id].append(n_attrib) def add_edge_attribute(self, property_of, name, values, type=None, subnetwork=None): if isinstance(property_of, dict): property_of = property_of.get('@id') if self.edgeAttributes.get(property_of) is None: self.edgeAttributes[property_of] = [] if type is None: self.edgeAttributes[property_of].append({'po': property_of, 'n': name, 'v': values}) else: self.edgeAttributes[property_of].append({'po': property_of, 'n': name, 'v': values, 'd': type})
[docs] def get_nodes(self): """ Returns an iterator over node ids as keys and node objects as values. Example: ``for id, node in nice_cx.get_nodes():`` ``node_name = node.get('n')`` ``node_represents = node.get('r')`` :return: iterator over nodes :rtype: iterator """ if NiceCXNetwork._is_python_three_or_greater(): return self.nodes.items() return self.nodes.iteritems()
def get_node(self, node_id): return self.nodes.get(node_id) def _generate_node_name_to_id_map(self): self.node_name_to_id_map_cache = {node.get('n'): node_id for node_id, node in self.get_nodes()} def get_node_by_name(self, node_name): if(len(self.node_name_to_id_map_cache) < 1): self._generate_node_name_to_id_map() node_id_lookup = self.node_name_to_id_map_cache.get(node_name) if node_id_lookup is not None: return self.nodes.get(node_id_lookup) else: return None # TODO - Check edges for orphans. Check node attributes for orphans #def remove_node(self, node_id): # raise Warning() # return self.nodes.pop(node_id, None) #============================= # NODE ATTRIBUTES OPERATIONS #=============================
[docs] def set_node_attribute(self, node, attribute_name, values, type=None, overwrite=False): """ Set an attribute of a node, where the node may be specified by its id or passed in as a node dict. Example: ``set_node_attribute(my_node, 'Pathway', 'Signal Transduction / Growth Regulation')`` or ``set_node_attribute(my_node, 'Mutation Frequency', 0.007, type='double')`` :param node: Node to add the attribute to :type node: int or node dict with @id attribute :param attribute_name: attribute name :type attribute_name: string :param values: A value or list of values of the attribute :type values: list, string, int or double :param type: The datatype of the attribute values, defaults is string. See `Supported data types`_ :type type: str :param overwrite: If True node attribute matching 'attribute_name' is removed first otherwise code blindly adds attribute :type overwrite: bool True means to overwrite node attribute named attribute_name :return: None :rtype: None """ self.add_node_attribute(property_of=node, name=attribute_name, values=values, type=type, overwrite=overwrite)
[docs] def get_node_attribute(self, node, attribute_name): """ Get the node attribute of a node, where the node may be specified by its id or passed in as an object. Example: ``get_node_attribute(my_node, 'Pathway')`` ``# returns: {'@id': 0, 'n': 'diffusion-heat', 'v': 0.832, 'd': 'double'}`` :param node: node object or node id :type node: int or node dict with @id attribute :param attribute_name: attribute name :type attribute_name: :return: the node attibute object or None if the attribute doesn't exist :rtype: dict """ node_attrs = self.get_node_attributes(node) if node_attrs: for n_a in node_attrs: if n_a.get('n') == attribute_name: return n_a return None
[docs] def get_node_attribute_value(self, node, attribute_name): """ Get the value(s) of an attribute of a node, where the node may be specified by its id or passed in as an object. Example: ``get_node_attribute_value(my_node, 'Pathway')`` ``# returns: 'Signal Transduction / Growth Regulation'`` :param node: node object or node id :type node: int or node dict with @id attribute :param attribute_name: attribute name :type attribute_name: :return: the value of the attibute or None if the attribute doesn't exist :rtype: string """ node_attrs = self.get_node_attributes(node) if node_attrs: for n_a in node_attrs: if n_a.get('n') == attribute_name: return n_a.get('v') return None
[docs] def get_node_attributes(self, node): """ Get the attribute objects of a node, where the node may be specified by its id or passed in as an object. Example: ``get_node_attributes(my_node)`` ``# returns: [{'po': 0, 'n': 'Pathway', 'v': 'Signal Transduction / Growth Regulation'}]`` :param node: node object or node id :type node: int or node dict with @id attribute :return: node attributes :rtype: list """ if isinstance(node, dict): return self.nodeAttributes.get(node.get('@id')) else: return self.nodeAttributes.get(node)
[docs] def set_network_attribute(self, name, values=None, type=None): """ Set an attribute of the network .. code-block:: python from ndex2.nice_cx_network import NiceCXNetwork net = NiceCXNetwork() net.set_network_attribute(name='networkType', values='Genetic interactions') :param name: Attribute name :type name: str :param values: The values of the attribute :type values: list, str, float, or int :param type: The datatype of the attribute values. See `Supported data types`_ :type type: str :return: None :rtype: none """ return self.add_network_attribute(name, values=values, type=type)
[docs] def set_edge_attribute(self, edge, attribute_name, values, type=None): """ Set the value(s) of attribute of an edge, where the edge may be specified by its id or passed in an object. Example: ``set_edge_attribute(0, 'weight', 0.5, type='double')`` or ``set_edge_attribute(my_edge, 'Disease', 'Atherosclerosis')`` :param edge: Edge to add the attribute to :type edge: int or edge dict with @id attribute :param attribute_name: Attribute name :type attribute_name: str :param values: A value or list of values of the attribute :type values: list :param type: The datatype of the attribute values, defaults to the python datatype of the values. See `Supported data types`_ :type type: str :return: None :rtype: None """ self.add_edge_attribute(property_of=edge, name=attribute_name, values=values, type=type)
#TODO add support for subnetworks
[docs] def get_edge_attributes(self, edge): """ Get the attribute objects of an edge, where the edge may be specified by its id or passed in as an object. Example: ``get_edge_attributes(my_edge)`` ``# returns: [{'@id': 0, 'n': 'weight', 'v': 0.849, 'd': 'double'}, {'@id': 0, 'n': 'Type', 'v': 'E1'}]`` :param edge: Edge object or edge id :type edge: int or edge dict with @id attribute :return: Edge attribute objects :rtype: list of edge dict """ if isinstance(edge, dict): return self.edgeAttributes.get(edge.get('@id')) return self.edgeAttributes.get(edge)
[docs] def get_edge_attribute(self, edge, attribute_name): """ Get the edge attributes of an edge, where the edge may be specified by its id or passed in as an object. Example: ``get_edge_attribute(my_edge, 'weight')`` ``# returns: {'@id': 0, 'n': 'weight', 'v': 0.849, 'd': 'double'}`` :param edge: Edge object or edge id :type edge: int or edge dict with @id attribute :param attribute_name: Attribute name :type attribute_name: :return: Edge attribute object :rtype: list, string, int or double """ edge_attrs = self.get_edge_attributes(edge) if edge_attrs: edge_attr_found = False for e_a in edge_attrs: if e_a.get('n') == attribute_name: return e_a return None, None
[docs] def get_edge_attribute_value(self, edge, attribute_name): """ Get the value(s) of an attribute of an edge, where the edge may be specified by its id or passed in as an object. Example: ``get_edge_attribute_value(my_edge, 'weight')`` ``# returns: 0.849`` :param edge: Edge object or edge id :type edge: int or edge dict with @id attribute :param attribute_name: Attribute name :type attribute_name: :return: Edge attribute value(s) :rtype: list, string, int or double """ edge_attrs = self.get_edge_attributes(edge) if edge_attrs: edge_attr_found = False for e_a in edge_attrs: if e_a.get('n') == attribute_name: return e_a.get('v') return None, None
def get_node_attributesx(self): return self.nodeAttributes.items() def remove_node(self, node): return self.nodes.pop(node, None) def remove_node_attribute(self, node, attribute_name): node_attrs = self.get_node_attributes(node) if node_attrs: for n_a in node_attrs: if n_a.get('n') == attribute_name: node_attrs.remove(n_a) break def remove_edge(self, edge): return self.edges.pop(edge, None) def remove_edge_attribute(self, edge, attribute_name): edge_attrs = self.get_edge_attributes(edge) if edge_attrs: for e_a in edge_attrs: if e_a.get('n') == attribute_name: edge_attrs.remove(e_a) break #================== # OTHER OPERATIONS #==================
[docs] def get_context(self): """ Get the @context information of the network. This information maps namespace prefixes to their defining URIs Example: ``{'pmid': 'https://www.ncbi.nlm.nih.gov/pubmed/'}`` :return: context object :rtype: dict """ for n_a in self.networkAttributes: if n_a.get('n') == '@context': return json.loads(n_a['v']) return None
[docs] def set_context(self, context): """ Set the @context information of the network. This information maps namespace prefixes to their defining URIs Example: .. code-block:: python from ndex2.nice_cx_network import NiceCXNetwork net = NiceCXNetwork() net.set_context({'pmid': 'https://www.ncbi.nlm.nih.gov/pubmed/'}) :param context: dict where key is name and value is URI or list of those dict objects :type context: dict or list :raises NDExError: If **context** is not of type :py:class:`list` or :py:class:`dict` :return: None :rtype: none """ if isinstance(context, list): add_this_context = {} for c in context: for k, v in c.items(): add_this_context[k] = v self.add_network_attribute(name='@context', values=json.dumps(add_this_context)) elif isinstance(context, dict): self.add_network_attribute(name='@context', values=json.dumps(context)) else: raise NDExError('Context provided is not of type list or dict')
def get_metadata(self): """ Get the network metadata :return: Network metadata :rtype: Iterator of metadata dict """ if NiceCXNetwork._is_python_three_or_greater(): return self.metadata.items() return self.metadata.iteritems() def set_metadata(self, metadata_obj): """ Set the network metadata .. versionchanged:: 3.5.0 Now raises more specific :py:class:`~ndex2.exceptions.NDExError` if **metadata_obj** is not of type :py:`dict` :param metadata_obj: Dict of metadata objects :type metadata_obj: dict :raises NDExError: If **metadata_obj** is not of type `dict` :return: None :rtype: none """ if isinstance(metadata_obj, dict): self.metadata = metadata_obj else: raise NDExError('Set metadata input was not of type <dict>') def get_opaque_aspect_table(self): return self.opaqueAspects
[docs] def get_opaque_aspect(self, aspect_name): """ Get the elements of the aspect specified by aspect_name :param aspect_name: the name of the aspect to retrieve. :type aspect_name: string :return: Opaque aspect :rtype: list of aspect elements """ return self.opaqueAspects.get(aspect_name)
[docs] def set_opaque_aspect(self, aspect_name, aspect_elements): """ Set the aspect specified by aspect_name to the list of aspect elements. If aspect_elements is `None`, the aspect is removed. .. versionchanged:: 3.5.0 Fixed bug where passing `None` in `aspect_elements` did **NOT** remove aspect. Code also now raises :py:class:`~ndex2.exceptions.NDExError` if input values are invalid .. code-block:: python from ndex2.nice_cx_network import NiceCXNetwork net = NiceCXNetwork() # to set an opaque aspect net.set_opaque_aspect('foo', [{'data': 'val'}]) # to remove an opaque aspect named 'foo' net.set_opaque_aspect('foo', None) :param aspect_name: Name of the aspect :type aspect_name: str :param aspect_elements: Aspect element :type aspect_elements: list of dict or dict :raises NDExError: If `aspect_name` is `None`, or if `aspect_elements` is not `None`, :py:class:`dict`, or :py:class:`list` :return: None :rtype: none """ if aspect_name is None: raise NDExError('aspect_name is None') if aspect_elements is None: self.remove_opaque_aspect(aspect_name) return if isinstance(aspect_elements, list): self.opaqueAspects[aspect_name] = aspect_elements return if isinstance(aspect_elements, dict): self.opaqueAspects[aspect_name] = [aspect_elements] return raise NDExError('Provided aspect for ' + aspect_name + ' is not of type <list or dict>')
def remove_opaque_aspect(self, aspect_name): """ Removes the given aspect from the opaque aspects collection :param aspect_name: The opaque aspect name :type aspect_name: str :return: None :rtype: None """ self.opaqueAspects.pop(aspect_name, None) def get_opaque_aspect_names(self): """ Get the names of all opaque aspects :return: List of opaque aspect names :rtype: list of strings """ return self.opaqueAspects.keys() # TODO - determine if this is useful def get_edge_attribute_element(self, edge, attr_name): attrs = self.edgeAttributes.get(edge.get_id()) for attr in attrs: if attr.get_name() == attr_name: return attr return None def get_edge_attributes_by_id(self, id): return self.edgeAttributes.get(id) def get_node_associated_aspects(self): return self.nodeAssociatedAspects def get_edge_associated_aspects(self): return self.edgeAssociatedAspects def get_node_associated_aspect(self, aspectName): return self.nodeAssociatedAspects.get(aspectName) def get_edge_associated_aspect(self, aspectName): return self.edgeAssociatedAspects.get(aspectName) def get_provenance(self): return self.provenance def get_missing_nodes(self): return self.missingNodes def get_edge_citations(self): return self.edgeCitations def get_node_citations(self): return self.nodeCitations def _get_visual_properties_aspect(self): """ Gets the visual Properties aspect (an opaque aspect) by first looking for :py:const:`~.NiceCXNetwork.CY_VISUAL_PROPERTIES` aspect and if not found then :py:const:`~.NiceCXNetwork.VISUAL_PROPERTIES` aspect. :return: visual properties aspect as dict or None if not found """ opaque_aspect_names = self.get_opaque_aspect_names() if NiceCXNetwork.CY_VISUAL_PROPERTIES in opaque_aspect_names: return self.get_opaque_aspect(NiceCXNetwork.CY_VISUAL_PROPERTIES) if NiceCXNetwork.VISUAL_PROPERTIES in opaque_aspect_names: return self.get_opaque_aspect(NiceCXNetwork.VISUAL_PROPERTIES) def _remove_node_and_edge_specific_visual_properties(self, vis_aspect): """ Removes node and edge specific visual properties :return: **vis_aspect* with node and edge :rtype: list """ if vis_aspect is None: return vis_aspect new_list = [] for entry in vis_aspect: if NiceCXNetwork.PROPERTIES_OF in entry: if entry[NiceCXNetwork.PROPERTIES_OF] == NiceCXNetwork.PROPS_OF_NODES: continue if entry[NiceCXNetwork.PROPERTIES_OF] == NiceCXNetwork.PROPS_OF_EDGES: continue new_list.append(entry) return new_list
[docs] def apply_style_from_network(self, nicecxnetwork): """ Applies Cytoscape visual properties from the network passed into this method. The style is pulled from :py:const:`~.NiceCXNetwork.VISUAL_PROPERTIES` or :py:const:`~.NiceCXNetwork.CY_VISUAL_PROPERTIES` :param nicecxnetwork: Network to extract style from :type nicecxnetwork: :py:class:`~.NiceCXNetwork` :raises TypeError: If object passed in is NOT a :py:class:`~.NiceCXNetwork` object or if object is None :raises NDExError: If :py:class:`~.NiceCXNetwork` does not have any visual styles :return: None :rtype: None """ if nicecxnetwork is None: raise TypeError('Object passed in is None') if not isinstance(nicecxnetwork, NiceCXNetwork): raise TypeError('Object passed in is not NiceCXNetwork') vis_props_aspect = nicecxnetwork._get_visual_properties_aspect() if vis_props_aspect is None: raise NDExError('No visual style found in network') clean_vis_props_aspect = self._remove_node_and_edge_specific_visual_properties(vis_props_aspect) self._set_visual_properties_aspect(clean_vis_props_aspect)
def _delete_deprecated_visual_properties_aspect(self): """ If found removes deprecated :py:const:`~.NiceCXNetwork.CY_VISUAL_PROPERTIES` from opaque aspects and from metadata :return: None :rtype: None """ opaque_aspect_names = self.get_opaque_aspect_names() if opaque_aspect_names is not None: if NiceCXNetwork.VISUAL_PROPERTIES in opaque_aspect_names: self.logger.debug(NiceCXNetwork.VISUAL_PROPERTIES + ' in opaque aspect. Removing entry') self.remove_opaque_aspect(NiceCXNetwork.VISUAL_PROPERTIES) if NiceCXNetwork.VISUAL_PROPERTIES in self.metadata: self.logger.warning(NiceCXNetwork.VISUAL_PROPERTIES + ' found in metadata. this should' ' be removed') def _set_visual_properties_aspect(self, visual_props_aspect): """ Replaces existing visual properties with data passed in. This method will update meta data and remove all visual aspects setting the new data to :py:const:`~.NiceCXNetwork.CY_VISUAL_PROPERTIES` aspect :param visual_props_aspect: visual properties aspect :type visual_props_aspect: list :raises TypeError: If **visual_props_aspect** is `None` :return: None """ if visual_props_aspect is None: raise TypeError('Visual Properties aspect is None') self._delete_deprecated_visual_properties_aspect() self.set_opaque_aspect(NiceCXNetwork.CY_VISUAL_PROPERTIES, visual_props_aspect) mde = { 'name': NiceCXNetwork.CY_VISUAL_PROPERTIES, 'elementCount': len(visual_props_aspect), 'version': "1.0", 'consistencyGroup': 1, 'properties': [] } self.metadata[NiceCXNetwork.CY_VISUAL_PROPERTIES] = mde
[docs] def apply_template(self, server, uuid, username=None, password=None): """ Applies the Cytoscape visual properties of a network from the provided uuid to this network. This allows the use of networks formatted in Cytoscape as templates to apply visual styles to other networks. .. versionchanged:: 3.5.0 Fixed bug where style from template was appended instead of replacing the existing style. In most cases, method now raises :py:class:`~ndex2.exceptions.NDExError` and subclasses instead of more generic :py:class:`Exception` .. code-block:: python from ndex2.nice_cx_network import NiceCXNetwork nice_cx = NiceCXNetwork() nice_cx.apply_template('public.ndexbio.org', '51247435-1e5f-11e8-b939-0ac135e8bacf') :param server: server host name (i.e. public.ndexbio.org) :type server: str :param username: username (optional - used when accessing private networks) :type username: str :param password: password (optional - used when accessing private networks) :type password: str :param uuid: uuid of the styled network :type uuid: str :raises NDExError: Raised if *server* or *uuid* not set or if metaData is not found in the network specified by *uuid* or some other server error :raises NDExUnauthorizedError: If credentials not authorized to access network specified by *uuid* :raises NDExNotFoundError: If network with *uuid* not found :return: None :rtype: None """ error_message = [] if not server: error_message.append('server') if not uuid: error_message.append('uuid') if len(error_message) > 0: raise NDExError(', '.join(error_message) + ' not specified in apply_template') # =================== # METADATA # =================== available_aspects = [] metadata_return = self.get_aspect(uuid, NiceCXNetwork.META_DATA, server, username, password) if metadata_return is None: raise NDExError('Template not found %s.' % uuid) for ae in (o for o in metadata_return): available_aspects.append(ae.get('name')) # ======================= # ADD VISUAL PROPERTIES # ======================= for oa in available_aspects: if NiceCXNetwork.VISUAL_PROPERTIES in oa or NiceCXNetwork.\ CY_VISUAL_PROPERTIES in oa: objects = self.get_aspect(uuid, oa, server, username, password) self._set_visual_properties_aspect(objects)
def get_frag_from_list_by_key(self, cx, key): for aspect in cx: if key in aspect: return aspect[key] return []
[docs] def to_pandas_dataframe(self, dataconverter=PandasDataConverter(), include_attributes=False): """ Network edges exported as a :py:class:`pandas.DataFrame` .. versionchanged:: 3.5.0 Added **include_attributes** and **dataconverter** parameters The following columns will be added to the :py:class:`pandas.DataFrame`: * **source** - Name of edge source node * **interaction** - Interaction between source and target node * **target** - Name of edge target node If **include_attributes** parameter is set to ``True`` then: All edge attributes will be also added as separate columns with same name. Attributes on **source** node will be added as a columns with ``source_`` prefixed to name. Attributes on **target** node will be added as columns with ``target_`` prefixed to name. .. note:: Values will converted based on CX data types. See :py:class:`~ndex2.util.PandasDataConverter` for information on how conversion is performed .. code-block:: python from ndex2.nice_cx_network import NiceCXNetwork net = NiceCXNetwork() node_one = net.create_node('node1') node_two = net.create_node('node2') net.set_node_attribute(node_one, 'weight', 0.5, type='double') net.set_node_attribute(node_two, 'weight', 0.2, type='double') edge_one = net.create_edge(edge_source=node_one, edge_target=node_two, edge_interaction='binds') net.set_edge_attribute(edge_one, 'edgelabel', 'an edge') df = net.to_pandas_dataframe(include_attributes=True) # df is now a pandas dataframe print(df.head()) Output from above code block: .. code-block:: python source interaction target edgelabel target_weight source_weight 0 node1 binds node2 an edge 0.2 0.5 .. note:: This method only processes nodes, edges, node attributes and edge attributes, but not network attributes or other aspects :param dataconverter: Object that converts CX data values to native data types. Default is :py:class:`~ndex2.util.PandasDataConverter` :type dataconverter: :py:class:`~ndex2.util.DataConverter` :param include_attributes: If `True` then edge attributes are added to :py:class:`pandas.DataFrame`, otherwise only **source**, **target**, and **interaction** are added :type include_attributes: bool :raises NDExInvalidParameterError: If **include_attributes** is not ``None`` or a :py:class:`bool` :return: Edge table with attributes :rtype: :py:class:`pandas.DataFrame` """ if include_attributes is not None and isinstance(include_attributes, bool) is False: raise NDExInvalidParameterError('include_attributes must be None or a bool') rows = [] if sys.version_info.major == 3: edge_items = self.edges.items() else: edge_items = self.edges.iteritems() edge_attr_name_set = set() node_attr_name_set = set() omit_attrs = True if include_attributes is not None and include_attributes is True: omit_attrs = False for k, v in edge_items: if omit_attrs is False: e_a = self.edgeAttributes.get(k) # ========================== # PROCESS EDGE ATTRIBUTES # ========================== add_this_dict = {} if e_a is not None: for e_a_item in e_a: add_this_dict[e_a_item.get('n')] = dataconverter.convert_value(e_a_item.get('v'), e_a_item.get('d')) edge_attr_name_set.add(e_a_item.get('n')) # ================================ # PROCESS SOURCE NODE ATTRIBUTES # ================================ s_a = self.nodeAttributes.get(v.get('s')) if s_a is not None: for s_a_item in s_a: add_this_dict['source_' + s_a_item.get('n')] = dataconverter.convert_value(s_a_item.get('v'), s_a_item.get('d')) node_attr_name_set.add('source_' + s_a_item.get('n')) # ================================ # PROCESS TARGET NODE ATTRIBUTES # ================================ t_a = self.nodeAttributes.get(v.get('t')) if t_a is not None: for t_a_item in t_a: add_this_dict['target_' + t_a_item.get('n')] = dataconverter.convert_value(t_a_item.get('v'), t_a_item.get('d')) node_attr_name_set.add('target_' + s_a_item.get('n')) if omit_attrs is False and add_this_dict: rows.append(dict(add_this_dict, source=self.nodes.get(v.get('s')).get('n'), target=self.nodes.get(v.get('t')).get('n'), interaction=v.get('i'))) else: rows.append(dict(source=self.nodes.get(v.get('s')).get('n'), target=self.nodes.get(v.get('t')).get('n'), interaction=v.get('i'))) if omit_attrs is False: df_columns = ['source', 'interaction', 'target'] + list(edge_attr_name_set) + list(node_attr_name_set) else: df_columns = ['source', 'interaction', 'target'] return pd.DataFrame(rows, columns=df_columns)
def add_metadata_stub(self, aspect_name): md = self.metadata.get(aspect_name) #if md is None: # mde = MetaDataElement(elementCount=0, properties=[], version='1.0', consistencyGroup=1, name=aspect_name) # self.add_metadata(mde)
[docs] def to_cx_stream(self): """ Returns a stream of the CX corresponding to the network. Can be used to post to endpoints that can accept streaming inputs :return: The CX stream representation of this network. :rtype: io.BytesIO """ cx = self.to_cx() if sys.version_info.major == 3: return io.BytesIO(json.dumps(cx).encode('utf-8')) else: return_bytes = None try: return_bytes = io.BytesIO(json.dumps(cx)) except UnicodeDecodeError as err1: print("Detected invalid encoding. Trying latin-1 encoding.") return_bytes = io.BytesIO(json.dumps(cx, encoding="latin-1")) print("Success") except Exception as err2: print(err2.message) return return_bytes
[docs] def upload_to(self, server=None, username=None, password=None, user_agent='', client=None): """ Upload this network as a new network on NDEx server. .. versionchanged:: 3.4.0 This method was switched to named arguments and the server and account credentials can be passed in one of two ways. Option 1) Set **username** and **password** parameters. Option 2) Set **client** parameter with valid :py:class:`~ndex2.client.Ndex2` object .. note:: If **client** parameter is set, **username**, **password**, and **server** parameters are ignored Example: .. code-block:: python import ndex2 nice_cx = ndex2.nice_cx_network.NiceCXNetwork() nice_cx.create_node('foo') # using production NDEx server nice_cx.update_to(username=user_var, password=password_var) # if one needs to use alternate NDEx server nice_cx.update_to(server='public.ndexbio.org', username=username_var, password=password_var) # Option 2, create Ndex2 client object ndex_client = ndex2.client.Ndex2(username=username_var, password=password_var) # using NDEx client object for connection nice_cx.update_to(client=ndex_client) :param server: The NDEx server to upload the network to. Leaving unset or `None` will use production :type server: str :param username: The username of the account to store the network. :type username: str :param password: The password for the account. :type password: str :param user_agent: String to append to User-Agent field sent to NDEx REST service :type user_agent: str :param client: NDEx2 object with valid credentials. If set **server**, **username**, and **password** parameters will be ignored. :type client: :py:class:`~ndex2.client.Ndex2` :return: The UUID of the network on NDEx. :rtype: str """ if client is not None: ndex = client else: ndex = Ndex2(server, username, password, user_agent=user_agent) return ndex.save_new_network(self.to_cx())
[docs] def update_to(self, uuid, server=None, username=None, password=None, user_agent='', client=None): """ Replace the network on NDEx server with matching NDEx `uuid` with this network. .. versionchanged:: 3.4.0 This method was switched to named arguments and the server and account credentials can be passed in one of two ways. Option 1) Set **username** and **password** parameters. Option 2) Set **client** parameter with valid :py:class:`~ndex2.client.Ndex2` object .. note:: If **client** parameter is set, **username**, **password**, and **server** parameters are ignored. Example: .. code-block:: python import ndex2 nice_cx = ndex2.nice_cx_network.NiceCXNetwork() nice_cx.create_node('foo') # using production NDEx server nice_cx.update_to('2ec87c51-c349-11e8-90ac-525400c25d22', username=user_var, password=password_var) # if one needs to use alternate NDEx server nice_cx.update_to('2ec87c51-c349-11e8-90ac-525400c25d22', server='public.ndexbio.org', username=username_var, password=password_var) # Option 2, create Ndex2 client object ndex_client = ndex2.client.Ndex2(username=username_var, password=password_var) # using NDEx client object for connection nice_cx.update_to('2ec87c51-c349-11e8-90ac-525400c25d22', client=ndex_client) :param uuid: UUID of the network on NDEx. :type uuid: str :param server: The NDEx server to upload the network to. Leaving unset or `None` will use production. :type server: str :param username: The username of the account to store the network. :type username: str :param password: The password for the account. :type password: str :param user_agent: String to append to User-Agent field sent to NDEx REST service :type user_agent: str :param client: NDEx2 object with valid credentials. If set **server**, **username**, and **password** parameters will be ignored. :type client: :py:class:`~ndex2.client.Ndex2` :return: Empty string :rtype: str """ cx = self.to_cx() if client is not None: ndex = client else: ndex = Ndex2(server, username, password, user_agent=user_agent) if len(cx) > 0: if cx[len(cx) - 1] is not None: if cx[len(cx) - 1].get('status') is None: # No STATUS element in the array. Append a new status cx.append({"status": [{"error": "", "success": True}]}) else: if len(cx[len(cx) - 1].get('status')) < 1: # STATUS element found, but the status was empty cx[len(cx) - 1].get('status').append({"error": "", "success": True}) if sys.version_info.major == 3: stream = io.BytesIO(json.dumps(cx).encode('utf-8')) else: stream = io.BytesIO(json.dumps(cx)) return ndex.update_cx_network(stream, uuid) else: raise IndexError("Cannot save empty CX. Please provide a " "non-empty CX document.")
def _get_node_and_edge_items(self): """ Deals with differences in API of items() call between Python 2 and 3 If Python >= 3 gets node.items() and edge.items() otherwise returns node.iteritems() and edge.iteritems() :return: tuple (node iterator, edge iterator) :rtype: tuple """ if NiceCXNetwork._is_python_three_or_greater(): return self.nodes.items(), self.edges.items() return self.nodes.iteritems(), self.edges.iteritems()
[docs] def to_networkx(self, mode='legacy'): """ Returns a NetworkX ``Graph()`` object or one of its subclasses based on the network. The `mode` parameter dictates how the translation occurs. This method currently supports the following mode values: .. warning:: **legacy** mode has known bugs when networkx 2.0+ or greater is installed. See the description on **legacy** mode below for more information. **Modes:** **legacy:** If mode set to **legacy** then this method will behave as it has for all versions of NDEx2 Python Client 3.1.0 and earlier which varies depending on version of networkx installed as described here: For networkx 2.0 and greater: (see :class:`LegacyNetworkXVersionTwoPlusFactory`) For older versions of networkx the following class is used with the `legacymode` parameter set to `True`: (see :class:`DefaultNetworkXFactory`) **default:** If mode is **default** or None then this method uses :class:`DefaultNetworkXFactory` regardless of networkx installed with `legacymode` set to `False` .. note:: ``default`` mode is the preferred mode to use Examples: .. code-block:: python # returns networkx graph using improved converter graph = nice_cx.to_networkx(mode='default') # returns networkx graph using legacy implementation graph = nice_cx.to_networkx(mode='legacy) :param mode: Since translation to networkx can be done in many ways this mode lets the caller dictate the method. :type mode: string :raises NDExError: If `mode` is not None, 'legacy', or 'default' :return: Networkx graph :rtype: :class:`networkx.Graph` or :class:`networkx.MultiGraph` """ if mode is None or mode == 'default': fac = DefaultNetworkXFactory() elif mode == 'legacy': nx_major_version = NetworkXFactory.get_networkx_major_version() if nx_major_version >= 2.0: fac = LegacyNetworkXVersionTwoPlusFactory() else: fac = DefaultNetworkXFactory(legacymode=True) else: raise NDExError(str(mode) + ' is not a valid mode') return fac.get_graph(self)
def get_summary(self): """ .. deprecated:: 3.3.2 This method has been deprecated. Please use :func:`print_summary` """ raise Warning('get_summary() is deprecated. Please use print_summary() instead') n_a_count = 0 for k, v in self.nodeAttributes.items(): n_a_count += len(v) e_a_count = 0 for k, v in self.edgeAttributes.items(): e_a_count += len(v) network_name = self.get_name() if not network_name: network_name = 'Untitled' summary_json = { 'Name': network_name, 'Nodes': len(self.nodes), 'Edges': + len(self.edges), 'Node Attributes': n_a_count, 'Edge Attributes': e_a_count } return summary_json
[docs] def print_summary(self): """ Print a network summary :return: Network summary :rtype: string """ n_a_count = 0 for k, v in self.nodeAttributes.items(): n_a_count += len(v) e_a_count = 0 for k, v in self.edgeAttributes.items(): e_a_count += len(v) network_name = self.get_name() if not network_name: network_name = 'Untitled' summary_string = \ 'Name: ' + network_name + '\n'\ 'Nodes: ' + str(len(self.nodes)) + '\n'\ + 'Edges: ' + str(len(self.edges)) + '\n'\ + 'Node Attributes: ' + str(n_a_count) + '\n'\ + 'Edge Attributes: ' + str(e_a_count) + '\n' print(summary_string)
def __str__(self): return 'nodes: %d \n edges: %d' % (len(self.nodes), len(self.edges))
[docs] def to_cx(self, log_to_stdout=True): """ Return the CX corresponding to the network. .. versionchanged:: 3.5.0 Added **log_to_stdout** param which lets caller silence print statement *Generating CX* :param log_to_stdout: If ``True`` then code will output to standard out *Generating CX* :type log_to_stdout: bool :return: CX representation of the network :rtype: CX (list of dict aspects) """ output_cx = [{"numberVerification": [{"longNumber": 281474976710655}]}] if log_to_stdout is not None and log_to_stdout is True: print('Generating CX') #===================================================== # IF THE @ID IS NOT NUMERIC WE NEED TO CONVERT IT TO # INT BY USING THE INDEX OF THE NON-NUMERIC VALUE #===================================================== if self.nodes: output_cx.append(self.generate_aspect('nodes')) if self.edges: output_cx.append(self.generate_aspect('edges')) if self.networkAttributes: output_cx.append(self.generate_aspect('networkAttributes')) if self.nodeAttributes: output_cx.append(self.generate_aspect('nodeAttributes')) if self.edgeAttributes: output_cx.append(self.generate_aspect('edgeAttributes')) if self.citations: output_cx.append(self.generate_aspect('citations')) if self.nodeCitations: output_cx.append(self.generate_aspect('nodeCitations')) if self.edgeCitations: output_cx.append(self.generate_aspect('edgeCitations')) if self.supports: output_cx.append(self.generate_aspect('supports')) if self.edgeSupports: output_cx.append(self.generate_aspect('edgeSupports')) if self.opaqueAspects: for oa in self.opaqueAspects: if isinstance(self.opaqueAspects[oa], bytes): bytes_string = self.opaqueAspects[oa].decode('ascii') output_cx.append({oa: [bytes_string]}) else: output_cx.append({oa: self.opaqueAspects[oa]}) oa_md = self.metadata.get(oa) if oa_md: oa_md['elementCount'] = len(self.opaqueAspects[oa]) else: self.metadata[oa] = { 'name': oa, 'elementCount': len(self.opaqueAspects[oa]), 'idCounter': len(self.opaqueAspects[oa]) + 1, 'properties': [] } if self.metadata: #=========================== # UPDATE CONSISTENCY GROUP #=========================== metadata_list = [] for k, mdata in self.metadata.items(): metadata_list.append(mdata) output_cx.insert(1, {'metaData': metadata_list}) if output_cx[-1].get('status') is None: output_cx.append({'status': [{'error': '', 'success': True}]}) return output_cx
def generate_aspect(self, aspect_name): core_aspect = ['nodes', 'edges', 'networkAttributes', 'nodeAttributes', 'edgeAttributes', 'metaData', 'citations', 'supports'] aspect_element_array = [] element_count = 0 element_id_max = 0 use_this_aspect = None #============================= # PROCESS CORE ASPECTS FIRST #============================= if aspect_name in core_aspect: use_this_aspect = self.string_to_aspect_object(aspect_name) if use_this_aspect is not None: if isinstance(use_this_aspect, dict): if aspect_name in ['nodes', 'edges']: for k, asp in use_this_aspect.items(): element_count += 1 if asp.get('@id') > element_id_max: element_id_max = asp.get('@id') aspect_element_array.append(asp) else: for k, asp in use_this_aspect.items(): if isinstance(asp, list): for asp_item in asp: element_count += 1 aspect_element_array.append(asp_item) else: element_count += 1 aspect_element_array.append(asp) elif isinstance(use_this_aspect, list): aspect_element_array = use_this_aspect element_count = len(use_this_aspect) else: #=========================== # PROCESS NON-CORE ASPECTS #=========================== use_this_aspect = self.string_to_aspect_object(aspect_name) if use_this_aspect is not None: if isinstance(use_this_aspect, dict): items = None if sys.version_info.major == 3: items = use_this_aspect.items() else: items = use_this_aspect.iteritems() for k, v in items: if aspect_name == 'edgeSupports': if isinstance(v, list): aspect_element_array.append({'po': [k], 'supports': v}) else: aspect_element_array.append({'po': [k], 'supports': [v]}) else: if isinstance(v, list): aspect_element_array.append({'po': [k], 'citations': v}) else: aspect_element_array.append({'po': [k], 'citations': [v]}) element_count +=1 else: raise Exception('Citation was not in json format') else: return None md = { 'name': aspect_name, 'elementCount': element_count, 'idCounter': element_count, 'version': "1.0", 'consistencyGroup': 1, 'properties': [] } self.metadata[aspect_name] = md aspect = {aspect_name: aspect_element_array} return aspect def generate_metadata_aspect(self): aspect_element_array = [] element_count = 0 element_id_max = 0 use_this_aspect = self.string_to_aspect_object('metaData') if use_this_aspect is not None: if sys.version_info.major == 3: items = use_this_aspect.items() else: items = use_this_aspect.iteritems() for k, v in items: add_this_element = v.to_cx() id = add_this_element.get('@id') if id is not None and id > element_id_max: element_id_max = id aspect_element_array.append(add_this_element) element_count +=1 aspect = {'metaData': aspect_element_array} return aspect def handle_metadata_update(self, aspect_name): aspect = self.string_to_aspect_object(aspect_name) def update_consistency_group(self): consistency_group = 1 if self.metadata: for mi_k, mi_v in self.metadata.items(): cg = mi_v.get_consistency_group() if cg > consistency_group: consistency_group = cg consistency_group += 1 # bump the consistency group up by one for mi_k, mi_v in self.metadata.items(): mi_v.set_consistency_group(consistency_group) def generate_metadata(self, G, unclassified_cx): return_metadata = [] consistency_group = 1 if self.metadata_original is not None: for mi in self.metadata_original: if mi.get("consistencyGroup") is not None: if mi.get("consistencyGroup") > consistency_group: consistency_group = mi.get("consistencyGroup") else: mi['consistencyGroup'] = 0 consistency_group += 1 # bump the consistency group up by one #======================== # Nodes metadata #======================== node_ids = [n[0] for n in G.nodes_iter(data=True)] if len(node_ids) < 1: node_ids = [0] return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": len(node_ids), "idCounter": max(node_ids), "name": "nodes", "properties": [], "version": "1.0" } ) #======================== # Edges metadata #======================== edge_ids = [e[2]for e in G.edges_iter(data=True, keys=True)] if len(edge_ids) < 1: edge_ids = [0] return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": len(edge_ids), "idCounter": max(edge_ids), "name": "edges", "properties": [], "version": "1.0" } ) #============================= # Network Attributes metadata #============================= if len(G.graph) > 0: return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": len(G.graph), "name": "networkAttributes", "properties": [], "version": "1.0" } ) #=========================== # Node Attributes metadata #=========================== #id_max = 0 attr_count = 0 for node_id, attributes in G.nodes_iter(data=True): for attribute_name in attributes: if attribute_name != "name" and attribute_name != "represents": attr_count += 1 if attr_count > 0: return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": attr_count, #"idCounter": id_max, "name": "nodeAttributes", "properties": [], "version": "1.0" } ) #=========================== # Edge Attributes metadata #=========================== #id_max = 0 attr_count = 0 for s, t, id, a in G.edges(data=True, keys=True): if(bool(a)): for attribute_name in a: if attribute_name != "interaction": attr_count += 1 if(attr_count > 0): return_metadata.append( { "consistencyGroup" : consistency_group, "elementCount" : attr_count, #"idCounter": id_max, "name" : "edgeAttributes", "properties" : [ ], "version" : "1.0" } ) #=========================== # cyViews metadata #=========================== if self.view_id != None: return_metadata.append( { "elementCount": 1, "name": "cyViews", "properties": [], "consistencyGroup" : consistency_group } ) #=========================== # subNetworks metadata #=========================== if self.subnetwork_id != None: return_metadata.append( { "elementCount": 1, "name": "subNetworks", "properties": [], "consistencyGroup" : consistency_group } ) #=========================== # networkRelations metadata #=========================== if self.subnetwork_id != None and self.view_id != None: return_metadata.append( { "elementCount": 2, "name": "networkRelations", "properties": [], "consistencyGroup" : consistency_group } ) #=========================== # citations and supports metadata #=========================== if len(self.support_map) > 0: return_metadata.append( { "elementCount": len(self.support_map), "name": "supports", "properties": [], "idCounter": max(self.support_map.keys()), "consistencyGroup" : consistency_group } ) if len(self.node_support_map) > 0: return_metadata.append( { "elementCount": len(self.node_support_map), "name": "nodeSupports", "properties": [], "consistencyGroup" : consistency_group } ) if len(self.edge_support_map) > 0: return_metadata.append( { "elementCount": len(self.edge_support_map), "name": "edgeSupports", "properties": [], "consistencyGroup" : consistency_group } ) if len(self.citation_map) > 0: return_metadata.append( { "elementCount": len(self.citation_map), "name": "citations", "properties": [], "idCounter": max(self.citation_map.keys()), "consistencyGroup" : consistency_group } ) if len(self.node_citation_map) > 0: return_metadata.append( { "elementCount": len(self.node_citation_map), "name": "nodeCitations", "properties": [], "consistencyGroup" : consistency_group } ) if len(self.edge_citation_map) > 0: return_metadata.append( { "elementCount": len(self.edge_citation_map), "name": "edgeCitations", "properties": [], "consistencyGroup" : consistency_group } ) if len(self.function_term_map) > 0: return_metadata.append( { "elementCount": len(self.function_term_map), "name": "functionTerms", "properties": [], "consistencyGroup" : consistency_group } ) if len(self.reified_edges) > 0: return_metadata.append( { "elementCount": len(self.reified_edges), "name": "reifiedEdges", "properties": [], "consistencyGroup" : consistency_group } ) #=========================== # ndexStatus metadata #=========================== return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": 1, "name": "ndexStatus", "properties": [], "version": "1.0" } ) #=========================== # cartesianLayout metadata #=========================== if self.pos and len(self.pos) > 0: return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": len(self.pos), "name": "cartesianLayout", "properties": [], "version": "1.0" } ) #=========================== # OTHER metadata #=========================== for asp in self.unclassified_cx: try: aspect_type = asp.iterkeys().next() if(aspect_type == "visualProperties" or aspect_type == "cyVisualProperties" or aspect_type == "@context"): return_metadata.append( { "consistencyGroup": consistency_group, "elementCount": len(asp[aspect_type]), "name": aspect_type, "properties": [] } ) except Exception as e: print(e.message) return [{'metaData': return_metadata}] def string_to_aspect_object(self, aspect_name): """ Given an aspect name via **aspect_name** this method returns the corresponding aspect object .. versionchanged:: 3.5.0 ``@context`` aspect removed :param aspect_name: name of aspect :type aspect_name: str :return: Aspect object or None """ if aspect_name == 'metaData': return self.metadata elif aspect_name == 'nodes': return self.nodes elif aspect_name == 'edges': return self.edges elif aspect_name == 'networkAttributes': return self.networkAttributes elif aspect_name == 'nodeAttributes': return self.nodeAttributes elif aspect_name == 'edgeAttributes': return self.edgeAttributes elif aspect_name == 'citations': return self.citations elif aspect_name == 'nodeCitations': return self.nodeCitations elif aspect_name == 'edgeCitations': return self.edgeCitations elif aspect_name == 'edgeSupports': return self.edgeSupports elif aspect_name == 'supports': return self.supports def get_aspect(self, uuid, aspect_name, server, username, password, stream=False): """ :param uuid: :param aspect_name: :param server: :param username: :param password: :param stream: refers to the response not the request :type stream: bool :return: """ if stream: return self.stream_aspect(uuid, aspect_name, server, username, password) else: return self.get_stream(uuid, aspect_name, server, username, password) def get_stream(self, uuid, aspect_name, server, username, password): """ :param uuid: Unique id of network in NDEx :type uuid: str :param aspect_name: aspect to stream. If `metaData` then the meta data of the network will be returned :type aspect_name: str :param server: Server to connect to. If value does not start with http, then http:// is prepended :type server: str :param username: NDEx username or None to not authenticate :type username: str :param password: NDEx password or None to not authenticate :type password: str :return: :rtype: """ if not server.lower().startswith('http'): server = 'http://' + server s = requests.session() if username and password: # add credentials to the session, if available s.auth = (username, password) url_suffix = '/' + aspect_name # if the metaData aspect is the aspect name # it means caller just wants the meta data aspect # which is obtained by omitting the aspect name # from the end of the URL if aspect_name == 'metaData': url_suffix = '' aspect_response = s.get(server + '/v2/network/' + uuid + '/aspect' + url_suffix) if aspect_response.status_code == 401: raise NDExUnauthorizedError(str(aspect_response.text)) if aspect_response.status_code == 404: raise NDExNotFoundError(str(aspect_response.text)) if aspect_response.status_code > 200: raise NDExError(str(aspect_response.text)) try: json_response = aspect_response.json() if aspect_name == 'metaData': return json_response[aspect_name] return json_response except json.decoder.JSONDecodeError as de: raise NDExError('Error parsing JSON from server: ' + str(de)) except requests.exceptions.RequestException as e: raise NDExError('Error parsing JSON from server: ' + str(e)) finally: s.close() def stream_aspect(self, uuid, aspect_name, server, username, password): if 'http' not in server: server = 'http://' + server if aspect_name == 'metaData': s = requests.session() if username and password: # add credentials to the session, if available s.auth = (username, password) md_response = s.get(server + '/v2/network/' + uuid + '/aspect') json_response = md_response.json() s.close() return json_response.get('metaData') else: if username and password: #base64string = base64.b64encode('%s:%s' % (username, password)) request = Request(server + '/v2/network/' + uuid + '/aspect/' + aspect_name, headers={"Authorization": "Basic " + base64.encodestring(username + ':' + password).replace('\n', '')}) else: request = Request(server + '/v2/network/' + uuid + '/aspect/' + aspect_name) try: urlopen_result = urlopen(request) #'http://dev2.ndexbio.org/v2/network/' + uuid + '/aspect/' + aspect_name) except HTTPError as e: print(e.code) return [] except URLError as e: print('Other error') print('URL Error %s' % e.message()) return [] return_items = ijson.items(urlopen_result, 'item') return return_items def _stringify_node_attributes(self): for node_id, node in self.get_nodes(): if self.get_node_attributes(node) is not None: for attr in self.get_node_attributes(node): if isinstance(attr['v'], dict) or isinstance(attr['v'], list): attr['v'] = json.dumps(attr['v']) elif not isinstance(attr['v'], str): attr['v'] = str(attr['v'])
class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): return float(o) if sys.version_info.major == 3: if isinstance(o, np.int64): return int(o) return super(DecimalEncoder, self).default(o)
[docs] class NetworkXFactory(object): """ Base class for subclasses that implement a factory that creates :class:`networkx.Graph` objects and contains a couple utility methods used by implementing factory classes """ def __init__(self): self._logger = logging.getLogger(__name__) self._nx_major_version = NetworkXFactory.get_networkx_major_version() @staticmethod def get_networkx_major_version(networkx_version=nx.__version__): """ Gets major version of networkx library :param networkx_version: raw version of networkx library :type networkx_version: str :return: major version of networkx assuming it will be in format of MAJOR.MINOR or MAJOR.MINOR.PATCH... or 0 if there was a problem :rtype: int """ if networkx_version is None: return 0 netx_ver_str = str(networkx_version) period_pos = netx_ver_str.find('.') if period_pos == -1: return 0 try: return int(netx_ver_str[0:period_pos]) except ValueError: return 0 def get_graph(self, nice_cx_network, networkx_graph=None): """ Creates NetworkX Graph object which can be one of the multiple types of Graph objects :raises NotImplementedError: Subclasses should implement this method :param nice_cx_network: Network to create networkx graph from :type nice_cx_network: :py:class:`NiceCXNetwork` :param networkx_graph: Empty networkx graph to populate :type networkx_graph: :class:`networkx.Graph` or subtype :return: networkx Graph object of some type :rtype: :class:`networkx.Graph` """ raise NotImplementedError('Must be implemented by sub class')
[docs] def copy_cartesian_coords_into_graph(self, nice_cx_network, networkx_graph): """ Examines the `nice_cx_network` extracting the content of the opaque aspect :py:const:`~ndex2.constants.CARTESIAN_LAYOUT_ASPECT` .. versionchanged:: 3.5.0 code now inverts value of y coordinate so position is correct in networkx If data is found in above aspect, then this method iterates through the list of values which is assumed to be a dictionary of node ids with coordinates as seen here:: [ { 'node': <id>, 'x': <x coord>, 'y': <y coord}, { 'node': <id>, 'x': <x coord>, 'y': <y coord}, . . ] These values (as seen in example above) are stored in the `networkx_graph` object as tuples with id of node set as key like so: .. code-block:: python networkx_graph.pos[<id from above>] = (<x coord>, <y coord>) :param nice_cx_network: Input network :type nice_cx_network: :py:class:`NiceCXNetwork` :param networkx_graph: Network to append coordinates to :raises NDExError: If either input parameter is None :type networkx_graph: :class:`networkx.Graph` :return: None """ if nice_cx_network is None: raise NDExError('Input network is None') if networkx_graph is None: raise NDExError('Output Graph is None') cl = nice_cx_network.get_opaque_aspect(constants.CARTESIAN_LAYOUT_ASPECT) if not cl: return networkx_graph.pos = {} for coord in cl: ctuple = (coord.get(constants.LAYOUT_X), -coord.get(constants.LAYOUT_Y)) networkx_graph.pos[coord.get(constants.LAYOUT_NODE)] = ctuple
[docs] def add_network_attributes_from_nice_cx_network(self, nice_cx_network, networkx_graph): """ Iterates through network attributes of input `nice_cx_network` appending the attributes to the graph object passed in setting the values like so: .. code-block:: python networkx_graph.graph[attribute_name] = attribute_value If the value of a network attribute is of type list then the values are converted to strings and concatenated into a single string separated by commas. :param nice_cx_network: Network to extract network attributes from :type nice_cx_network: :py:class:`NiceCXNetwork` :param networkx_graph: networkx Graph object, should work with any of the types of Graphs ie MultiGraph etc.. :type networkx_graph: :class:`networkx.Graph` :raises NDExError: If either input parameter is None :return: None """ if nice_cx_network is None: raise NDExError('Input network is None') if networkx_graph is None: raise NDExError('Output Graph is None') for name in nice_cx_network.get_network_attribute_names(): netattr = nice_cx_network.get_network_attribute(name) if netattr is None: continue val = netattr[constants.NET_ATTR_VALUE] if isinstance(val, list): val = ','.join([str(entry) for entry in val]) networkx_graph.graph[name] = val
[docs] def add_node(self, networkx_graph, nodeid, node_attributes, name=None, represents=None): """ Adds node to `graph` dealing with differences between networkx 1.x and 2.x+ :param networkx_graph: networkx graph to add node to :type networkx_graph: :class:`networkx.Graph` or one of its subtypes :param nodeid: node identifier can be string, int etc. :param node_attributes: dictionary of key => value data to set set node attributes with :type node_attributes: dict :param name: name of node that is set as attribute with key 'name' on node :type name: string :param represents: represents value for node that is set as attribute with key 'represents' on node :return: None """ if self._nx_major_version >= 2: self._add_node_networkx_two_plus(networkx_graph, nodeid, node_attributes, name=name, represents=represents) return self._add_node_networkx_legacy(networkx_graph, nodeid, node_attributes, name=name, represents=represents)
def _add_node_networkx_legacy(self, networkx_graph, nodeid, node_attributes, name=None, represents=None): """ :param networkx_graph: :param nodeid: :param node_attributes: :param name: :param represents: :return: """ node_attrs = None if node_attributes: node_attrs = {} for na_item in node_attributes: node_attrs[na_item.get(constants.NODE_ATTR_NAME)] = na_item.get(constants.NODE_ATTR_VALUE) if represents: networkx_graph.add_node(nodeid, node_attrs, name=name, represents=represents) else: networkx_graph.add_node(nodeid, node_attrs, name=name) def _add_node_networkx_two_plus(self, networkx_graph, nodeid, node_attributes, name=None, represents=None): """ Adds node to `graph` assuming installed version of networkx is 2 or greater :param networkx_graph: Graph to add node to :type networkx_graph: :class:`networkx.Graph` :param nodeid: id of node, could be str, int :param node_attributes: dictionary of node attributes usually obtained by invoking :py:func:`NiceCXNetwork.get_node_attributes(nodeid)` :type node_attributes: dict :param name: :param represents: :return: """ networkx_graph.add_node(nodeid) if node_attributes: for n_a in node_attributes: networkx_graph.nodes[nodeid][n_a.get(constants.NODE_ATTR_NAME)] = n_a.get(constants.NODE_ATTR_VALUE) if represents: networkx_graph.nodes[nodeid]['represents'] = represents if name: networkx_graph.nodes[nodeid]['name'] = name
[docs] def add_edge(self, networkx_graph, source_node, target_node, attribute_dict): """ Adds edge to `graph` dealing with differences between networkx 1.x and 2.x+ :param networkx_graph: networkx graph to add node to :type networkx_graph: :class:`networkx.Graph` or one of its subtypes :param source_node: id of source node :param target_node: id of target node :param attribute_dict: dictionary of edge attributes :type attribute_dict: dict :return: None """ if self._nx_major_version >= 2: self._add_edge_networkx_two_plus(networkx_graph, source_node, target_node, attribute_dict) return self._add_edge_networkx_legacy(networkx_graph, source_node, target_node, attribute_dict)
def _add_edge_networkx_two_plus(self, networkx_graph, source_node, target_node, attribute_dict): """ :param networkx_graph: :param source_node: :param target_node: :param attribute_dict: :return: """ edge_key = networkx_graph.add_edge(source_node, target_node) if attribute_dict is None: return if edge_key is None: for k in attribute_dict: networkx_graph[source_node][target_node][k] = attribute_dict[k] return for k in attribute_dict: networkx_graph[source_node][target_node][edge_key][k] = attribute_dict[k] def _add_edge_networkx_legacy(self, networkx_graph, source_node, target_node, attribute_dict): """ :param networkx_graph: :param source_node: :param target_node: :param attribute_dict: :return: """ networkx_graph.add_edge(source_node, target_node, attr_dict=attribute_dict)
[docs] class DefaultNetworkXFactory(NetworkXFactory): """ Converts :class:`NiceCXNetwork` to :class:`networkx.Graph` object or one of its subtypes For details on implementation see :func:`~get_graph` """ def __init__(self, legacymode=False): """ Constructor Note: the parameters in the constructor change behavior of :py:func:`~DefaultNetworkXFactory.get_graph` :param legacymode: If set to True then :py:func:`~DefaultNetworkXFactory.get_graph` behaves like NDEx2 Python client version 3.1 and earlier in that this method returns a :class:`networkx.Graph` object. see :py:func:`~DefaultNetworkXFactory.get_graph` for more information :type legacymode: bool :raises NDExError: If invalid value is set in `legacymode` parameter """ super(DefaultNetworkXFactory, self).__init__() if legacymode is None: legacymode = False if not isinstance(legacymode, bool): raise NDExError(str(legacymode) + ' not a valid value for legacymode parameter') self._legacymode = legacymode
[docs] def get_graph(self, nice_cx_network, networkx_graph=None): """ Creates a :class:`networkx.Graph`, or a subtype, object from `nice_cx_network` passed in. .. warning:: Converting large networks (10,000+ edges or nodes) may take a long time and consume lots of memory. The conversion is done as follows: Any network attributes are copied to the :class:`networkx.Graph` in manner described here: :py:func:`~NetworkXFactory.add_network_attributes_from_nice_cx_network` For nodes: All nodes are added with the node id set to the id or :py:const:`~ndex2.constants.NODE_ID` of input network nodes. A node attribute named 'name' is set for each node with its value set to the value of the 'name' attribute from the input network. If 'r' exists on node, the value is added as a node attribute named 'represents' (unless `legacymode` is set to `True` in constructor) All other node attributes are added using the same attribute name as found in the input network. The value is directly set as it was found in input network (could be single object or list) For edges: Each edge is added setting the source to the value of :py:const:`~ndex2.constants.EDGE_SOURCE` attribute and target set as :py:const:`~ndex2.constants.EDGE_TARGET` attribute of input network. Any edge attributes named :py:const:`~ndex2.constants.EDGE_INTERACTION` are renamed 'interaction' and stored as an attribute for the edge .. versionchanged:: 3.5.0 If the value of an edge attribute is a list then the value is set directly in the graph as is as opposed to being converted into a comma delimited string Coordinates are copied in manner described here: :py:func:`~NetworkXFactory.copy_cartesian_coords_into_graph` .. warning:: If **legacymode** is set to True in constructor then: * :class:`networkx.Graph` created by this method does **NOT** support multiple edges between the same nodes. Extra edges encountered are **ignored** and not converted. * In addition, the 'r' attribute in the node dict is **NOT** copied to the resulting :class:`networkx.Graph` object. * `networkx_graph` parameter is ignored :param nice_cx_network: Network to extract graph from :type nice_cx_network: :class:`NiceCXNetwork` :param networkx_graph: Empty networkx graph to populate which is **IGNORED** if `legacymode` is set to True in constructor. If unset and `legacymode` is False in constructor then a :class:`networkx.MultiDiGraph` is created :type networkx_graph: :class:`networkx.Graph` or subtype :raises NDExError: if input network is None :return: Input network converted to networkx Graph :rtype: :class:`networkx.Graph` if legacymode is set to True in constructor otherwise :class:`networkx.MultiDiGraph` unless `networkx_graph` is set in which case `networkx_graph` is returned """ if nice_cx_network is None: raise NDExError('input network is None') if self._legacymode is True: g = nx.Graph() else: if networkx_graph is None: g = nx.MultiDiGraph() else: g = networkx_graph self.add_network_attributes_from_nice_cx_network(nice_cx_network, g) self._process_nodes(nice_cx_network, g) self._process_edges(nice_cx_network, g) self.copy_cartesian_coords_into_graph(nice_cx_network, g) return g
def _process_nodes(self, nice_cx_network, graph): """ Iterates through the nodes adding them to the graph object using :func:`networkx.Graph.add_node()` setting name of node to node id and 'name' attribute to the value of the node attribute 'name'. This has a couple problems, one if the attribute value is a list that list is directly set in the Graph() object. :return: """ represents = None for k, v in nice_cx_network.get_nodes(): if self._legacymode is False: represents = v.get(constants.NODE_REPRESENTS) self.add_node(graph, k, nice_cx_network.get_node_attributes(k), name=v.get(constants.NODE_ATTR_NAME), represents=represents) def _process_edges(self, nice_cx_network, graph): """ Adds edges by calling :func:`networkx.Graph().add_edge()` setting source to the 's' attribute on the :param graph: Graph to add edges to :type graph: :class:`networkx.Graph` :return: """ for k, v in nice_cx_network.get_edges(): e_a = nice_cx_network.get_edge_attributes(k) add_this_dict = {} add_this_dict['interaction'] = v.get(constants.EDGE_INTERACTION) if e_a is not None: for e_a_item in e_a: add_this_dict[e_a_item.get('n')] = e_a_item.get('v') self.add_edge(graph, v[constants.EDGE_SOURCE], v[constants.EDGE_TARGET], add_this_dict)
[docs] class LegacyNetworkXVersionTwoPlusFactory(NetworkXFactory): """ .. deprecated:: 3.2.0 This implementation contains errors, but is left for backwards compatibility of :func:`NiceCXNetwork.to_networkx` Converts :class:`NiceCXNetwork` to :class:`networkx.Graph` object following logic in legacy NDEx2 Python client when networkx 2.0+ is installed. .. warning:: This implementation assumes networkx 2.0+ is installed and will fail with older versions. For conversion details see :func:`~LegacyNetworkXVersionTwoPlusFactory.get_graph` """ def __init__(self): """ Constructor """ super(LegacyNetworkXVersionTwoPlusFactory, self).__init__()
[docs] def get_graph(self, nice_cx_network, networkx_graph=None): """ Creates a :class:`networkx.Graph` object from `nice_cx_network` passed in. .. deprecated:: 3.2.0 This implementation contains errors, but is left for backwards compatibility of :func:`NiceCXNetwork.to_networkx` .. warning:: Converting large networks (10,000+ edges or nodes) may take a long time and consume lots of memory. This implementation uses node name as ID for nodes, which is problematic if multiple nodes share the same name and results in invalid mapping of node positions :class:`networkx.Graph` created by this method does NOT support multiple edges between the same nodes. Extra edges encountered are **ignored** and not converted. The conversion is done as follows: Any network attributes are copied to the :class:`networkx.Graph` in manner described here: :py:func:`~NetworkXFactory.add_network_attributes_from_nice_cx_network` For nodes: All nodes are added with the node id set to value of 'n' on node. For multiple nodes with same 'n' value behavior is unknown A node attribute named 'name' is set for each node with its value set to the value of the 'name' attribute from the input network. If 'r' exists on node, the value is added as a node attribute named 'represents' All other node attributes are added using the same attribute name as found in the input network. The value is directly set name as found in the input network. The value is directly set as it was found in input network (could be single object or list) For edges: Each edge is added setting the source to the value of 's' attribute and target set as 't' attribute of input network. Any edge attributes named 'i' are renamed 'interaction' and stored as an attribute for the edge If the value of an edge attribute is a list then the list values are turned into a string separated by a comma and then enclosed by double quotes. Coordinates are copied in manner described here: :py:func:`~NetworkXFactory.copy_cartesian_coords_into_graph` :param nice_cx_network: Network to extract graph from :type nice_cx_network: :class:`NiceCXNetwork` :param networkx_graph: ignored by this implementation :type networkx_graph: :class:`networkx.Graph` or subtype :return: Input network converted to networkx Graph :rtype: :class:`networkx.Graph` """ g = nx.Graph() self.add_network_attributes_from_nice_cx_network(nice_cx_network, g) self._process_nodes(nice_cx_network, g) self._process_edges(nice_cx_network, g) self.copy_cartesian_coords_into_graph(nice_cx_network, g) return g
def _process_nodes(self, nice_cx_network, graph): """ Processes the nodes :return: """ for k, v in nice_cx_network.get_nodes(): node_name = v.get('n') graph.add_node(v.get('n')) n_a = nice_cx_network.get_node_attributes(k) if n_a: for na_item in n_a: graph.nodes[node_name][na_item.get('n')] = na_item.get('v') if v.get('r'): graph.nodes[node_name]['represents'] = v.get('r') def _process_edges(self, nice_cx_network, graph): """ Process edges :param graph: :return: """ for k, v in nice_cx_network.get_edges(): source_node = nice_cx_network.get_node(v.get('s')).get('n') target_node = nice_cx_network.get_node(v.get('t')).get('n') graph.add_edge(source_node, target_node) e_a = nice_cx_network.get_edge_attributes(k) graph[source_node][target_node]['interaction'] = v.get('i') if e_a is not None: for e_a_item in e_a: if isinstance(e_a_item.get('v'), list): graph[source_node][target_node][e_a_item.get('n')] = '"%s"' % ','.join( str(e) for e in e_a_item.get('v')) else: graph[source_node][target_node][e_a_item.get('n')] = e_a_item.get('v')