#!/usr/bin/env python
import requests
import json
import logging
from requests_toolbelt import MultipartEncoder
import io
import sys
import decimal
import numpy
from ndex2.version import __version__
from ndex2.exceptions import NDExInvalidCXError
from ndex2.exceptions import NDExUnauthorizedError
from ndex2.exceptions import NDExError
from ndex2.exceptions import NDExUnsupportedCallError
from ndex2.exceptions import NDExInvalidParameterError
from ndex2.exceptions import NDExNotFoundError
try:
from urllib.parse import urljoin
except ImportError:
from urlparse import urljoin
from requests import exceptions as req_except
import time
userAgent = 'NDEx2-Python/' + __version__
DEFAULT_SERVER = "http://public.ndexbio.org"
[docs]class Ndex2(object):
""" A class to facilitate communication with an
`NDEx server <http://ndexbio.org>`_.
If host is not provided it will default to the
`NDEx public server <http://ndexbio.org>`_. UUID is required
"""
USER_AGENT_KEY = 'User-Agent'
VALID_NETWORK_SYSTEM_PROPERTIES = ['showcase', 'visibility',
'index_level', 'readOnly']
def __init__(self, host=None, username=None, password=None,
update_status=False, debug=False, user_agent='',
timeout=30):
"""
Creates a connection to a particular `NDEx server <http://ndexbio.org>`_.
:param host: The URL of the server.
:type host: string
:param username: The username of the NDEx account to use. (Optional)
:type username: string
:param password: The account password. (Optional)
:type password: string
:param update_status: If set to True tells constructor to query
service for status
:type update_status: bool
:param user_agent: String to append to
`User-Agent <https://tools.ietf.org/html/rfc1945#page-46>`_
header sent with all requests to server
:type user_agent: string
:param timeout: The timeout in seconds value for requests to server. This value
is passed to Request calls `Click here for more information
<http://docs.python-requests.org/en/master/user/advanced/#timeouts>`_
:type timeout: float or tuple(float, float)
"""
self.debug = debug
self.version = 1.3
self.status = {}
self.username = username
self.password = password
self.user_agent = user_agent
if self.user_agent is None:
self.user_agent = ''
else:
if len(self.user_agent) > 0:
self.user_agent = ' ' + self.user_agent
self.logger = logging.getLogger(__name__)
self.timeout = timeout
if host is None:
host = DEFAULT_SERVER
elif 'http' not in host:
host = 'http://' + host
if "localhost" in host:
self.host = "http://localhost:8080/ndexbio-rest"
else:
status_url = "/rest/admin/status"
try:
version_url = urljoin(host, status_url)
response = requests.get(version_url,
headers={Ndex2.USER_AGENT_KEY:
userAgent + self.user_agent})
response.raise_for_status()
data = response.json()
prop = data.get('properties')
if prop is not None:
pv = prop.get('ServerVersion')
if pv is not None:
if not pv.startswith('2.'):
raise Exception("This release only supports NDEx 2.x server.")
else:
self.version = pv
self.host = host + "/v2"
else:
self.logger.warning("Warning: This release doesn't fully "
"support 1.3 version of NDEx")
self.version = "1.3"
self.host = host + "/rest"
else:
self.logger.warning("Warning: No properties found. "
"This release doesn't fully "
"support 1.3 version of NDEx")
self.version = "1.3"
self.host = host + "/rest"
except req_except.HTTPError as he:
self.logger.warning("Can't determine server version. " + host +
' Server returned error -- ' + str(he) +
' will assume 1.3 version of NDEx which' +
' is not fully supported by this release')
self.version = "1.3"
self.host = host + "/rest"
# TODO - how to handle errors getting server version...
# create a session for this Ndex
self.s = requests.session()
if username and password:
# add credentials to the session, if available
self.s.auth = (username, password)
if update_status:
self.update_status()
# Base methods for making requests to this NDEx
def set_request_timeout(self, time_in_secs):
"""
Sets request timeout.
`Click here for more information <http://docs.python-requests.org/en/master/user/quickstart/#timeouts>`_
:param time_in_secs: Seconds to wait on a request to the
service before giving up.
:type time_in_secs: int
"""
self.timeout = time_in_secs
def set_debug_mode(self, debug):
self.debug = debug
def debug_response(self, response):
if self.debug:
self.logger.debug("status code: " + str(response.status_code))
if not response.status_code == requests.codes.ok:
self.logger.debug("response text: " + str(response.text))
def _require_auth(self):
"""
:raises NDExUnauthorizedError: If no credentials are found in this object
"""
if not self.s.auth:
raise NDExUnauthorizedError("This method requires user authentication")
def _get_user_agent(self):
"""
Creates string to use for User-Agent header
:return: string containing User-Agent header value
"""
return userAgent + self.user_agent
def _return_response(self, response,
raiseforstatus=True,
returnfullresponse=False,
returnjsonundertry=False):
"""
Given a response from service request
this method returns response.json() if the
headers content-type is application/json otherwise
response.text is returned
:param response: response object from requests.put or requests.post
call
:param returnfullresponse: If True then response object passed in
is returned
:type returnfullresponse: bool
:return: response.json() or response.text unless returnfullresponse
then response passed in is just returned.
"""
self.debug_response(response)
if raiseforstatus:
response.raise_for_status()
if returnfullresponse is True:
return response
if response.status_code == 204:
return ""
if returnjsonundertry is True:
try:
result = response.json()
except ValueError:
result = response.text
return result
if response.headers['content-type'] == 'application/json':
return response.json()
else:
return response.text
def put(self, route, put_json=None):
url = self.host + route
self.logger.debug("PUT route: " + url)
self.logger.debug("PUT json: " + str(put_json))
headers = self.s.headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
headers['Accept'] = 'application/json'
headers[Ndex2.USER_AGENT_KEY] = self._get_user_agent()
if put_json is not None:
response = self.s.put(url, data=put_json, headers=headers,
timeout=self.timeout)
else:
response = self.s.put(url, headers=headers,
timeout=self.timeout)
return self._return_response(response)
def post(self, route, post_json):
url = self.host + route
self.logger.debug("POST route: " + url)
self.logger.debug("POST json: " + post_json)
headers = {'Content-Type': 'application/json',
'Accept': 'application/json,text/plain',
'Cache-Control': 'no-cache',
Ndex2.USER_AGENT_KEY: self._get_user_agent(),
}
response = self.s.post(url, data=post_json, headers=headers,
timeout=self.timeout)
return self._return_response(response)
def delete(self, route, data=None,
raiseforstatus=True,
returnfullresponse=False,
returnjsonundertry=False):
url = self.host + route
self.logger.debug("DELETE route: " + url)
headers = self.s.headers
headers[Ndex2.USER_AGENT_KEY] = userAgent + self.user_agent
headers['Connection'] = 'close'
if data is not None:
response = self.s.delete(url, headers=headers, data=data,
timeout=self.timeout)
else:
response = self.s.delete(url, headers=headers,
timeout=self.timeout)
return self._return_response(response,
raiseforstatus=raiseforstatus,
returnfullresponse=returnfullresponse,
returnjsonundertry=returnjsonundertry)
def get(self, route, get_params=None):
url = self.host + route
self.logger.debug("GET route: " + url)
headers = self.s.headers
headers[Ndex2.USER_AGENT_KEY] = self._get_user_agent()
headers['Connection'] = 'close'
response = self.s.get(url, params=get_params, headers=headers,
timeout=self.timeout)
return self._return_response(response)
# The stream refers to the Response, not the Request
def get_stream(self, route, get_params=None):
url = self.host + route
self.logger.debug("GET stream route: " + url)
headers = self.s.headers
headers[Ndex2.USER_AGENT_KEY] = self._get_user_agent()
response = self.s.get(url, params=get_params, stream=True,
headers=headers, timeout=self.timeout)
return self._return_response(response,
returnfullresponse=True)
# The stream refers to the Response, not the Request
def post_stream(self, route, post_json):
url = self.host + route
self.logger.debug("POST stream route: " + url)
headers = self.s.headers
headers['Content-Type'] = 'application/json'
headers['Accept'] = 'application/json'
headers[Ndex2.USER_AGENT_KEY] = self._get_user_agent()
headers['Connection'] = 'close'
response = self.s.post(url, data=post_json, headers=headers,
stream=True, timeout=self.timeout)
return self._return_response(response,
returnfullresponse=True)
# The Request is streamed, not the Response
def put_multipart(self, route, fields):
url = self.host + route
multipart_data = MultipartEncoder(fields=fields)
self.logger.debug("PUT route: " + url)
headers = {'Content-Type': multipart_data.content_type,
'Accept': 'application/json',
Ndex2.USER_AGENT_KEY: self._get_user_agent(),
'Connection': 'close'
}
response = requests.put(url, data=multipart_data, headers=headers, auth=(self.username, self.password))
return self._return_response(response,
returnjsonundertry=True)
# The Request is streamed, not the Response
def post_multipart(self, route, fields, query_string=None):
if query_string:
url = self.host + route + '?' + query_string
else:
url = self.host + route
multipart_data = MultipartEncoder(fields=fields)
self.logger.debug("POST route: " + url)
headers = {'Content-Type': multipart_data.content_type,
Ndex2.USER_AGENT_KEY: self._get_user_agent(),
'Connection': 'close'
}
response = requests.post(url, data=multipart_data, headers=headers, auth=(self.username, self.password))
return self._return_response(response,
returnjsonundertry=True)
# Network methods
[docs] def save_new_network(self, cx, visibility=None):
"""
Create a new network (cx) on the server
:param cx: Network cx
:type cx: list of dicts
:param visibility: Sets the visibility (PUBLIC or PRIVATE)
:type visibility: string
:raises NDExInvalidCXError: For invalid CX data
:return: Response data
:rtype: string or dict
"""
if cx is None:
raise NDExInvalidCXError('CX is None')
if not isinstance(cx, list):
raise NDExInvalidCXError('CX is not a list')
if len(cx) is 0:
raise NDExInvalidCXError('CX appears to be empty')
indexed_fields = None
#TODO add functionality for indexed_fields when it's supported by the server
if cx[-1] is not None:
if cx[-1].get('status') is None:
cx.append({"status": [{"error": "", "success": True}]})
else:
if len(cx[-1].get('status')) < 1:
cx[-1].get('status').append({"error": "", "success": True})
if sys.version_info.major == 3:
stream = io.BytesIO(json.dumps(cx, cls=DecimalEncoder).encode('utf-8'))
else:
stream = io.BytesIO(json.dumps(cx, cls=DecimalEncoder))
return self.save_cx_stream_as_new_network(stream, visibility=visibility)
[docs] def save_cx_stream_as_new_network(self, cx_stream, visibility=None):
"""
Create a new network from a CX stream.
:param cx_stream: IO stream of cx
:type cx_stream: BytesIO
:param visibility: Sets the visibility (PUBLIC or PRIVATE)
:type visibility: string
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return: Response data
:rtype: string or dict
"""
self._require_auth()
query_string = None
if visibility:
query_string = 'visibility=' + str(visibility)
if self.version.startswith('1.'):
route = '/network/asCX'
else:
route = '/network'
fields = {
'CXNetworkStream': ('filename', cx_stream, 'application/octet-stream')
}
return self.post_multipart(route, fields, query_string=query_string)
[docs] def update_cx_network(self, cx_stream, network_id):
"""
Update the network specified by UUID network_id using the CX stream cx_stream.
:param cx_stream: The network stream.
:param network_id: The UUID of the network.
:type network_id: str
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return: The response.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
self._require_auth()
fields = {
'CXNetworkStream': ('filename', cx_stream, 'application/octet-stream')
}
if self.version.startswith('1.'):
route = '/network/asCX/%s' % network_id
else:
route = "/network/%s" % network_id
return self.put_multipart(route, fields)
[docs] def get_network_as_cx_stream(self, network_id):
"""
Get the existing network with UUID network_id from the NDEx connection as a CX stream.
:param network_id: The UUID of the network.
:type network_id: str
:return: The response.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
if self.version.startswith('1.'):
route = "/network/%s/asCX" % network_id
else:
route = "/network/%s" % network_id
return self.get_stream(route)
[docs] def get_network_aspect_as_cx_stream(self, network_id, aspect_name):
"""
Get the specified aspect of the existing network with UUID network_id from the NDEx connection as a CX stream.
For a list of aspect names look at **Core Aspects** section of
`CX Data Model Documentation <https://home.ndexbio.org/data-model/>`__
:param network_id: The UUID of the network.
:param aspect_name: The aspect NAME.
:type network_id: str
:return: The response.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
if self.version.startswith('1.'):
route = "/network/%s/asCX" % network_id
else:
route = "/network/%s/aspect/%s" % (network_id, aspect_name)
return self.get_stream(route)
[docs] def get_neighborhood_as_cx_stream(self, network_id, search_string, search_depth=1, edge_limit=2500, error_when_limit=True):
"""
Get a CX stream for a subnetwork of the network specified by UUID network_id and a traversal of search_depth
steps around the nodes found by search_string.
:param network_id: The UUID of the network.
:type network_id: str
:param search_string: The search string used to identify the network neighborhood.
:type search_string: str
:param search_depth: The depth of the neighborhood from the core nodes identified.
:type search_depth: int
:param edge_limit: The maximum size of the neighborhood.
:type edge_limit: int
:param error_when_limit: Default value is true. If this value is true the server will stop streaming the network when it hits the edgeLimit, add success: false and error: "EdgeLimitExceeded" in the status aspect and close the CX stream. If this value is set to false the server will return a subnetwork with edge count up to edgeLimit. The status aspect will be a success, and a network attribute {"EdgeLimitExceeded": "true"} will be added to the returned network only if the server hits the edgeLimit..
:type error_when_limit: boolean
:return: The response.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
if self.version.startswith('1.'):
route = "/network/%s/query" % network_id
else:
route = "/search/network/%s/query" % network_id
post_data = {'searchString': search_string,
'searchDepth': search_depth,
'edgeLimit': edge_limit,
'errorWhenLimitIsOver': error_when_limit}
post_json = json.dumps(post_data)
return self.post_stream(route, post_json=post_json)
[docs] def get_neighborhood(self, network_id, search_string, search_depth=1, edge_limit=2500):
"""
Get the CX for a subnetwork of the network specified by UUID network_id and a traversal of search_depth steps
around the nodes found by search_string.
:param network_id: The UUID of the network.
:type network_id: str
:param search_string: The search string used to identify the network neighborhood.
:type search_string: str
:param search_depth: The depth of the neighborhood from the core nodes identified.
:type search_depth: int
:param edge_limit: The maximum size of the neighborhood.
:type edge_limit: int
:return: The CX json object.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
if self.version.startswith('1.'):
raise Exception("get_neighborhood is not supported for versions prior to 2.0, "
"use get_neighborhood_as_cx_stream")
response = self.get_neighborhood_as_cx_stream(network_id, search_string, search_depth=search_depth,
edge_limit=edge_limit)
response_json = response.json()
if isinstance(response_json, dict):
return response_json.get('data')
elif isinstance(response_json, list):
return response_json
return response_json
def get_interconnectquery_as_cx_stream(self, network_id, search_string,
search_depth=1, edge_limit=2500,
error_when_limit=True):
"""
Get a CX stream for a neighborhood subnetwork where all the
paths must start and end at one of the query nodes in the network
specified by networkid.
:param network_id: The UUID of the network.
:type network_id: str
:param search_string: The search string used to identify the network neighborhood.
:type search_string: str
:param search_depth: The depth of the neighborhood from the core nodes identified.
:type search_depth: int
:param edge_limit: The maximum size of the neighborhood.
:type edge_limit: int
:param error_when_limit: Default value is true. If this value is true the server will stop streaming the network when it hits the edgeLimit, add success: false and error: "EdgeLimitExceeded" in the status aspect and close the CX stream. If this value is set to false the server will return a subnetwork with edge count up to edgeLimit. The status aspect will be a success, and a network attribute {"EdgeLimitExceeded": "true"} will be added to the returned network only if the server hits the edgeLimit..
:type error_when_limit: boolean
:return: The response.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
route = "/search/network/%s/interconnectquery" % network_id
post_data = {'searchString': search_string,
'searchDepth': search_depth,
'edgeLimit': edge_limit,
'errorWhenLimitIsOver': error_when_limit}
post_json = json.dumps(post_data)
return self.post_stream(route, post_json=post_json)
def get_interconnectquery(self, network_id, search_string,
search_depth=1, edge_limit=2500,
error_when_limit=True):
"""
Gets a CX network for a neighborhood subnetwork where all the
paths must start and end at one of the query nodes in the network
specified by networkid.
:param network_id: The UUID of the network.
:type network_id: str
:param search_string: The search string used to identify the network neighborhood.
:type search_string: str
:param search_depth: The depth of the neighborhood from the core nodes identified.
:type search_depth: int
:param edge_limit: The maximum size of the neighborhood.
:type edge_limit: int
:param error_when_limit: Default value is true. If this value is true the server will stop streaming the network when it hits the edgeLimit, add success: false and error: "EdgeLimitExceeded" in the status aspect and close the CX stream. If this value is set to false the server will return a subnetwork with edge count up to edgeLimit. The status aspect will be a success, and a network attribute {"EdgeLimitExceeded": "true"} will be added to the returned network only if the server hits the edgeLimit..
:type error_when_limit: boolean
:return: The CX json object.
:rtype: list
"""
response = self.get_interconnectquery_as_cx_stream(network_id,
search_string,
search_depth=search_depth,
edge_limit=edge_limit,
error_when_limit=error_when_limit)
response_json = response.json()
if isinstance(response_json, dict):
return response_json.get('data')
elif isinstance(response_json, list):
return response_json
else:
return response_json
[docs] def search_networks(self, search_string="", account_name=None, start=0, size=100, include_groups=False):
"""
Search for networks based on the search_text, optionally limited to networks owned by the specified
account_name.
:param search_string: The text to search for.
:type search_string: str
:param account_name: The account to search
:type account_name: str
:param start: The number of blocks to skip. Usually zero, but may be used to page results.
:type start: int
:param size: The size of the block.
:type size: int
:param include_groups:
:type include_groups:
:return: The response.
:rtype: `response object <http://docs.python-requests.org/en/master/user/quickstart/#response-content>`_
"""
post_data = {"searchString": search_string}
if self.version.startswith('1.'):
route = "/network/search/%s/%s" % (start, size)
else:
route = "/search/network?start=%s&size=%s" % (start, size)
if include_groups:
post_data["includeGroups"] = True
if account_name:
post_data["accountName"] = account_name
post_json = json.dumps(post_data)
return self.post(route, post_json)
def search_network_nodes(self, network_id, search_string='', limit=5):
post_data = {"searchString": search_string}
if self.version.startswith('1.'):
route = "/network/%s/nodes/%s" % (network_id, limit)
else:
route = "/search/network/%s/nodes?limit=%s" % (network_id, limit)
post_json = json.dumps(post_data)
return self.post(route, post_json)
def find_networks(self, search_string="", account_name=None, skip_blocks=0, block_size=100):
"""
.. deprecated:: 3.3.2
Use :func:`search_networks` instead.
:param search_string:
:param account_name:
:param skip_blocks:
:param block_size:
:return:
"""
self.logger.warning("find_networks is deprecated, please use search_networks")
return self.search_networks(search_string, account_name, skip_blocks, block_size)
def search_networks_by_property_filter(self, search_parameter_dict={}, account_name=None, limit=100):
raise Exception("This function is not supported in NDEx 2.0")
def network_summaries_to_ids(self, network_summaries):
network_ids = []
for network in network_summaries:
network_ids.append(network['externalId'])
return network_ids
[docs] def get_network_summary(self, network_id):
"""
Gets information about a network.
:param network_id: The UUID of the network.
:type network_id: str
:return: Summary
:rtype: dict
"""
if self.version.startswith('1.'):
route = "/network/%s" % network_id
else:
route = "/network/%s/summary" % network_id
return self.get(route)
[docs] def make_network_public(self, network_id):
"""
Makes the network specified by the **network_id** public
by invoking :py:func:`set_network_system_properties` with
``{'visibility': 'PUBLIC'}``
:param network_id: The UUID of the network.
:type network_id: str
:raises NDExUnauthorizedError: If credentials are invalid or not set
:raises requests.exception.HTTPError: If there is some other error
:return: empty string upon success
:rtype: str
"""
if self.version.startswith('1.'):
return self.update_network_profile(network_id, {'visibility': 'PUBLIC'})
return self.set_network_system_properties(network_id, {'visibility': 'PUBLIC'})
def _make_network_public_indexed(self, network_id):
"""
Makes the network specified by the **network_id** public
by invoking :py:func:`set_network_system_properties` with
``{'visibility': 'PUBLIC', 'index_level': 'ALL', 'showcase': True}``
:param network_id: The UUID of the network.
:type network_id: str
:raises NDExUnsupportedCallError: If version of NDEx server is < 2
:raises NDExUnauthorizedError: If credentials are invalid or not set
:raises requests.exception.HTTPError: If there is some other error
:return: empty string upon success
:rtype: str
"""
if self.version.startswith('1.'):
raise NDExUnsupportedCallError('Only 2+ of NDEx supports '
'setting/changing index level')
return self.set_network_system_properties(network_id,
{'visibility': 'PUBLIC',
'index_level': 'ALL',
'showcase': True})
[docs] def make_network_private(self, network_id):
"""
Makes the network specified by the **network_id** private
by invoking :py:func:`set_network_system_properties` with
``{'visibility': 'PRIVATE'}``
:param network_id: The UUID of the network.
:type network_id: str
:raises NDExUnauthorizedError: If credentials are invalid or not set
:raises requests.exception.HTTPError: If there is some other error
:return: empty string upon success
:rtype: str
"""
if self.version.startswith('1.'):
return self.update_network_profile(network_id,
{'visibility': 'PRIVATE'})
return self.set_network_system_properties(network_id,
{'visibility': 'PRIVATE'})
[docs] def get_task_by_id(self, task_id):
"""
Retrieves a task by id
:param task_id: Task id
:type task_id: string
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return: Task
:rtype: dict
"""
self._require_auth()
route = "/task/%s" % task_id
return self.get(route)
[docs] def delete_network(self, network_id, retry=5):
"""
Deletes the specified network from the server
:param network_id: Network id
:type network_id: string
:param retry: Number of times to retry if deleting fails
:type retry: int
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return: Error json if there is an error. Blank
:rtype: string
"""
self._require_auth()
route = "/network/%s" % network_id
count = 0
while count < retry:
try:
return self.delete(route)
except Exception as inst:
d = json.loads(inst.response.content)
if d.get('errorCode').startswith("NDEx_Concurrent_Modification"):
self.logger.debug("retry deleting network in 1 second(" + str(count) + ")")
count += 1
time.sleep(1)
else:
raise inst
raise Exception("Network is locked after " + str(retry) + " retry.")
def get_provenance(self, network_id):
"""
Gets the network provenance
.. deprecated:: 3.3.2
This method has been deprecated.
:param network_id: Network id
:type network_id: string
:return: Provenance
:rtype: dict
"""
route = "/network/%s/provenance" % network_id
return self.get(route)
def set_provenance(self, network_id, provenance):
"""
Sets the network provenance
.. deprecated:: 3.3.2
This method has been deprecated.
:param network_id: Network id
:type network_id: string
:param provenance: Network provcenance
:type provenance: dict
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return: Result
:rtype: dict
"""
self._require_auth()
route = "/network/%s/provenance" % network_id
if isinstance(provenance, dict):
put_json = json.dumps(provenance)
else:
put_json = provenance
return self.put(route, put_json)
[docs] def set_read_only(self, network_id, value):
"""
Sets the read only flag to **value** on the network specified by
**network_id**
:param network_id: Network id
:type network_id: str
:param value: Must :py:const:`True` for read only, :py:const:`False` otherwise
:type value: bool
:raises NDExUnauthorizedError: If credentials are invalid or not set
:raises NDExInvalidParameterError: If non bool is set in
**valid** parameter
:raises requests.exception.HTTPError: If there is some other error
:return: empty string upon success
:rtype: str
"""
return self.set_network_system_properties(network_id,
{"readOnly": value})
[docs] def set_network_properties(self, network_id, network_properties):
"""
Updates properties of network
Starting with version 2.5 of NDEx, any network properties
not in the `network_properties` parameter are left unchanged.
.. warning::
``name, description, version`` network attributes/properties cannot be updated by this method.
Please use :py:func:`update_network_profile` to update these values.
The format of `network_properties` should be a :py:func:`list` of :py:func:`dict`
objects in this format:
.. code-block::
[{
'subNetworkId': '',
'predicateString': '',
'dataType': '',
'value': ''
}]
The ``predicateString`` field above is the network attribute/property name.
The ``dataType`` field above must be one of the following
`types <https://ndex2.readthedocs.io/en/latest/ndex2.html?highlight=list_of_string#supported-data-types>`__
Regardless of ``dataType``, ``value`` should be converted to :py:func:`str` or :py:func:`list` of
:py:func:`str`
For more information please visit the underlying
`REST call documentation <http://openapi.ndextools.org/#/Network/put_network__networkid__properties>`__
Example to add two network properties (``foo``, ``bar``):
.. code-block::
[{
'subNetworkId': '',
'predicateString': 'foo',
'dataType': 'list_of_integer',
'value': ['1', '2', '3']
},{
'subNetworkId': '',
'predicateString': 'bar',
'dataType': 'string',
'value': 'a value for bar as str'
}]
:param network_id: Network id
:type network_id: str
:param network_properties: List of NDEx property value pairs aka network
properties to set on the network. This can
also be a :py:func:`str` in JSON format
:type network_properties: list or str
:raises Exception: If `network_properties` is not a :py:func:`str` or
:py:func:`list`
:raises NDExUnauthorizedError: If credentials are invalid or not set
:raises requests.HTTPError: If there is an error with the request or
if ``name, version, description`` is set
in `network_properties` as a value to
``predicateString``
:return: Empty string or ``1``
:rtype: str or int
"""
self._require_auth()
route = "/network/%s/properties" % network_id
if isinstance(network_properties, list):
put_json = json.dumps(network_properties)
elif isinstance(network_properties, str):
put_json = network_properties
else:
raise Exception("network_properties must be a string or a list "
"of NdexPropertyValuePair objects")
return self.put(route, put_json)
[docs] def get_sample_network(self, network_id):
"""
Gets the sample network
:param network_id: Network id
:type network_id: string
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return: Sample network
:rtype: list of dicts in cx format
"""
route = "/network/%s/sample" % network_id
return self.get(route)
def set_network_sample(self, network_id, sample_cx_network_str):
"""
:param network_id:
:param sample_cx_network_str:
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return:
"""
self._require_auth()
route = "/network/%s/sample" % network_id
# putJson = json.dumps(sample_cx_network_str)
return self.put(route, sample_cx_network_str)
def _validate_network_system_properties(self, network_properties,
skipvalidation=False):
"""
Verifies 'network_properties' passed in is a valid dict that
can be set as network system properties. The dict should
be of format:
{'showcase': (boolean True or False),
'visibility': (str set to 'PUBLIC' or 'PRIVATE'),
'index_level': (str set to 'NONE', 'META', 'ALL'),
'readOnly': (boolean True or False)}
:param network_properties:
:type network_properties: dict
:raises NDExInvalidParameterError: If 'network_properties' is invalid
:return: JSON string of 'network_properties'
:rtype: str
"""
if isinstance(network_properties, dict):
net_props = network_properties
elif isinstance(network_properties, str):
try:
net_props = json.loads(network_properties)
except Exception as e:
raise NDExInvalidParameterError('Error parsing json string: ' +
str(network_properties) +
' : ' + str(e))
else:
raise NDExInvalidParameterError("network_properties must be "
"a string or a dict")
if skipvalidation is not None and skipvalidation is True:
return json.dumps(net_props)
if len(net_props.keys()) == 0:
raise NDExInvalidParameterError('network_properties '
'appears to be empty')
for key in net_props.keys():
if key not in Ndex2.VALID_NETWORK_SYSTEM_PROPERTIES:
raise NDExInvalidParameterError(key + ' is not a valid network '
'system property')
if key == 'readOnly' or key == 'showcase':
if not isinstance(net_props[key], bool):
raise NDExInvalidParameterError(key + ' value must be a '
'bool set to True '
'or False')
elif key == 'index_level':
if not isinstance(net_props[key], str) \
or net_props[key] not in ['NONE', 'META', 'ALL']:
raise NDExInvalidParameterError(key + ' value must be a '
'string set to '
'NONE, META, or ALL')
elif key == 'visibility':
if not isinstance(net_props[key], str) \
or net_props[key] not in ['PUBLIC', 'PRIVATE']:
raise NDExInvalidParameterError(key + ' value must be a '
'string set to '
'PUBLIC or PRIVATE')
return json.dumps(net_props)
[docs] def set_network_system_properties(self, network_id, network_properties,
skipvalidation=False):
"""
Set network system properties on network with UUID specified by
**network_id**
The network properties should be a :py:func:`dict` or a json string of a :py:func:`dict`
in this format:
.. code-block:: json-object
{'showcase': (boolean True or False),
'visibility': (str 'PUBLIC' or 'PRIVATE'),
'index_level': (str 'NONE', 'META', or 'ALL'),
'readOnly': (boolean True or False)}
.. note::
Omit any values from :py:func:`dict` that you do NOT want changed
Definition of **showcase** values:
:py:const:`True` - means network will display in her home page for other users and :py:const:`False` hides the network for other users. where other users includes anonymous users
Definition of **visibility** values:
'PUBLIC' - means it can be found or read by anyone, including anonymous users
'PRIVATE' - is the default, means that it can only be found or read by users according to their permissions
Definition of **index_level** values:
'NONE' - no index
'META' - only index network attributes
'ALL' - full index on the network
Definition of **readOnly** values:
:py:const:`True` - means network is only readonly, False is NOT readonly
This method will validate **network_properties** matches above :py:func:`dict`
unless **skipvalidation** is set to :py:const:`True` in which case the code
only verifies the **network_properties** is valid JSON
:param network_id: Network id
:type network_id: str
:param network_properties: Network properties as :py:func:`dict` or a JSON string
of :py:func:`dict` adhering to structure above.
:type network_properties: dict or str
:param skipvalidation: If :py:const:`True`, only verify **network_properties**
can be parsed/converted to valid JSON
:raises NDExUnsupportedCallError: If version of NDEx server is < 2
:raises NDExUnauthorizedError: If credentials are invalid or not set
:raises NDExInvalidParameterError: If invalid data is set in
**network_properties** parameter
:raises requests.exception.HTTPError: If there is some other error
:return: empty string upon success
:rtype: str
"""
if self.version.startswith('1.'):
raise NDExUnsupportedCallError('This call only works with NDEx 2+')
self._require_auth()
route = "/network/%s/systemproperty" % network_id
# check that the properties are valid
put_json = self._validate_network_system_properties(network_properties,
skipvalidation=
skipvalidation)
return self.put(route, put_json)
[docs] def update_network_profile(self, network_id, network_profile):
"""
Updates the network profile
Any profile attributes specified will be updated but attributes that are not specified will
have no effect - omission of an attribute does not mean deletion of that attribute.
The network profile attributes that can be updated by this method are: 'name', 'description' and 'version'.
.. code-block:: python
{
"name": "string",
"description": "string",
"version": "string",
"visibility": "string",
"properties": [
{
"subNetworkId": "",
"predicateString": "string",
"dataType": "string",
"value": "string"
}
]
}
:param network_id: Network id
:type network_id: string
:param network_profile: Network profile
:type network_profile: dict
:raises NDExUnauthorizedError: If credentials are invalid or not set
:return:
:rtype:
"""
self._require_auth()
if isinstance(network_profile, dict):
if network_profile.get("visibility") and self.version.startswith("2."):
raise Exception("Ndex 2.x doesn't support setting visibility by this function. "
"Please use make_network_public/private function to set network visibility.")
json_data = json.dumps(network_profile)
elif isinstance(network_profile, str):
json_data = network_profile
else:
raise Exception("network_profile must be a string or a dict")
if self.version.startswith('1.'):
route = "/network/%s/summary" % network_id
return self.post(route, json_data)
else:
route = "/network/%s/profile" % network_id
return self.put(route, json_data)
def upload_file(self, filename):
raise NDExError("This function is not supported in this release. Please use the save_new_network "
"function to create new networks in NDEx server.")
[docs] def update_network_group_permission(self, groupid, networkid, permission):
"""
Updated group permissions
:param groupid: Group id
:type groupid: string
:param networkid: Network id
:type networkid: string
:param permission: Network permission
:type permission: string
:return: Result
:rtype: dict
"""
route = "/network/%s/permission?groupid=%s&permission=%s" % (networkid, groupid, permission)
self.put(route)
[docs] def update_network_user_permission(self, userid, networkid, permission):
"""
Updated network user permission
:param userid: User id
:type userid: string
:param networkid: Network id
:type networkid: string
:param permission: Network permission
:type permission: string
:return: Result
:rtype: dict
"""
route = "/network/%s/permission?userid=%s&permission=%s" % (networkid, userid, permission)
self.put(route)
[docs] def grant_networks_to_group(self, groupid, networkids, permission="READ"):
"""
Set group permission for a set of networks
:param groupid: Group id
:type groupid: string
:param networkids: List of network ids
:type networkids: list
:param permission: Network permission
:type permission: string
:return: Result
:rtype: dict
"""
for networkid in networkids:
self.update_network_group_permission(groupid, networkid, permission)
[docs] def get_user_by_username(self, username):
"""
Gets user information as a dict in format:
.. code-block:: json-object
{'properties': {},
'isIndividual': True,
'userName': 'bsmith',
'isVerified': True,
'firstName': 'bob',
'lastName': 'smith',
'emailAddress': 'bob.smith@ndexbio.org',
'diskQuota': 10000000000,
'diskUsed': 3971183103,
'externalId': 'f2c3a7ef-b0d9-4c61-bf31-4c9fcabe4173',
'isDeleted': False,
'modificationTime': 1554410147104,
'creationTime': 1554410138498
}
:param username: User name
:type username: string
:return: user information as dict
:rtype: dict
"""
route = "/user?username=%s" % username
return self.get(route)
def get_network_summaries_for_user(self, username):
network_summaries = self.get_user_network_summaries(username)
# self.search_networks("", username, size=1000)
# if (network_summaries and network_summaries['networks']):
# network_summaries_list = network_summaries['networks']
# else:
# network_summaries_list = []
return network_summaries
[docs] def get_user_network_summaries(self, username, offset=0, limit=1000):
"""
Get a list of network summaries for networks owned by specified user.
It returns not only the networks that the user owns but also the networks that are
shared with them directly.
:param username: the username of the network owner
:type username: str
:param offset: the starting position of the network search
:type offset: int
:param limit:
:type limit:
:return: list of uuids
:rtype: list
"""
user = self.get_user_by_username(username)#.json
if self.version.startswith('1.'):
route = "/user/%s/networksummary/asCX?offset=%s&limit=%s" % (user['externalId'], offset, limit)
else:
route = "/user/%s/networksummary?offset=%s&limit=%s" % (user['externalId'], offset, limit)
network_summaries = self.get_stream(route)
# uuids = None
# if network_summaries:
# uuids = [d.get('externalId') for d in network_summaries.json()]
if network_summaries:
return network_summaries.json()
else:
return None
[docs] def get_network_ids_for_user(self, username):
"""
Get the network uuids owned by the user
:param username: users NDEx username
:type username: str
:return: list of uuids
"""
network_summaries_list = self.get_network_summaries_for_user(username)
return self.network_summaries_to_ids(network_summaries_list)
[docs] def grant_network_to_user_by_username(self, username, network_id, permission):
"""
Grants permission to network for the given user name
:param username: User name
:type username: string
:param network_id: Network id
:type network_id: string
:param permission: Network permission
:type permission: string
:return: Result
:rtype: dict
"""
user = self.get_user_by_username(username).json
self.update_network_user_permission(user["externalid"], network_id, permission)
[docs] def grant_networks_to_user(self, userid, networkids, permission="READ"):
"""
Gives read permission to specified networks for the provided user
:param userid: User id
:type userid: string
:param networkids: Network ids
:type networkids: list of strings
:param permission: Network permissions
:type permission: string (default is READ)
:return: none
:rtype: none
"""
for networkid in networkids:
self.update_network_user_permission(userid, networkid, permission)
def update_status(self):
"""
Updates the admin status, storing the status in the client
object self.status
:return: None
"""
route = "/admin/status"
self.status = self.get(route)
[docs] def create_networkset(self, name, description):
"""
Creates a new network set
:param name: Network set name
:type name: string
:param description: Network set description
:type description: string
:return: URI of the newly created network set
:rtype: string
"""
route = '/networkset'
return self.post(route, json.dumps({"name": name,
"description": description}))
[docs] def get_network_set(self, set_id):
"""
Gets the network set information including the list of networks
.. deprecated:: 3.2.0
Use :func:`get_networkset` instead.
:param set_id: network set id
:type set_id: basestring
:return: network set information
:rtype: dict
"""
return self.get_networkset(set_id)
[docs] def get_networkset(self, set_id):
"""
Gets the network set information including the list of networks
:param set_id: network set id
:type set_id: basestring
:return: network set information
:rtype: dict
"""
route = '/networkset/%s' % set_id
return self.get(route)
[docs] def delete_networkset(self, networkset_id):
"""
Deletes the network set, requires credentials
:param networkset_id: networkset UUID id
:type networkset_id: str
:raises NDExInvalidParameterError: for invalid networkset id parameter
:raises NDExUnauthorizedError: If no credentials or user is not authorized
:raises NDExNotFoundError: If no networkset with id passed in found
:raises NDExError: For any other error with contents of error in message
:return: None upon success
"""
if networkset_id is None:
raise NDExInvalidParameterError('networkset id cannot be None')
if not isinstance(networkset_id, str):
raise NDExInvalidParameterError('networkset id must be a string')
self._require_auth()
route = '/networkset/%s' % networkset_id
res = self.delete(route, raiseforstatus=False,
returnfullresponse=True)
if res.status_code == 204 or res.status_code == 200:
return None
if res.status_code == 401:
raise NDExUnauthorizedError('Not authorized')
if res.status_code == 404:
raise NDExNotFoundError('Network set with id: ' +
str(networkset_id) + ' not found')
msg = 'Unknown error server returned ' \
'status code: ' + str(res.status_code)
try:
jsondata = res.json()
msg += ' : ' + json.dumps(jsondata)
except Exception:
pass
raise NDExError(msg)
[docs] def add_networks_to_networkset(self, set_id, networks):
"""
Add networks to a network set. User must have visibility of all networks being added
:param set_id: network set id
:type set_id: basestring
:param networks: networks that will be added to the set
:type networks: list of strings
:return: None
:rtype: None
"""
route = '/networkset/%s/members' % set_id
post_json = json.dumps(networks)
return self.post(route, post_json)
[docs] def delete_networks_from_networkset(self, set_id, networks, retry=5):
"""
Removes network(s) from a network set.
:param set_id: network set id
:type set_id: basestring
:param networks: networks that will be removed from the set
:type networks: list of strings
:param retry: Number of times to retry
:type retry: int
:return: None
:rtype: None
"""
route = '/networkset/%s/members' % set_id
post_json = json.dumps(networks)
headers = self.s.headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
headers['Accept'] = 'application/json'
headers['User-Agent'] = userAgent + self.user_agent
count = 0
while count < retry:
try:
return self.delete(route, data=post_json)
except Exception as inst:
d = json.loads(inst.response.content)
if d.get('errorCode').startswith("NDEx_Concurrent_Modification"):
self.logger.debug("retry deleting network in 1 second(" + str(count) + ")")
count += 1
time.sleep(1)
else:
raise inst
raise Exception("Network is locked after " + str(retry) + " retry.")
class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
elif isinstance(o, numpy.integer):
return int(o)
elif isinstance(o, bytes):
bytes_string = o.decode('ascii')
return bytes_string
return super(DecimalEncoder, self).encode(o)