sparktk.graph.graph module
# vim: set encoding=utf-8
# Copyright (c) 2016 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
logger = logging.getLogger('sparktk')
from graphframes.graphframe import GraphFrame, _from_java_gf
from py4j.protocol import Py4JJavaError
from pyspark.sql.utils import IllegalArgumentException
from sparktk import TkContext
from sparktk.frame.frame import Frame
from sparktk.arguments import require_type
# import constructors for the API's sake (not actually dependencies of the Graph)
from sparktk.graph.constructors.create import create
from sparktk.graph.constructors.import_orientdb_graph import import_orientdb_graph
__all__ = ["create",
"Graph",
"import_orientdb_graph",
"load"]
class Graph(object):
"""
sparktk Graph
Represents a graph with a frame defining vertices and another frame defining edges. It is implemented as a very
thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods
are available.
A vertices frame defines the vertices for the graph and must have a schema with a column
named "id" which provides unique vertex ID. All other columns are treated as vertex properties.
If a column is also found named "vertex_type", it will be used as a special label to denote the type
of vertex, for example, when interfacing with logic (such as a graph DB) which expects a
specific vertex type.
An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the
vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named
"edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic
(such as a graph DB) which expects a specific edge type.
Unlike sparktk Frames, this Graph is immutable (it cannot be changed). The vertices and edges may be extracted as
sparktk Frame objects, but those frames then take on a life of their own apart from this graph object. Those frames
could be transformed and used to build a new graph object if necessary.
The underlying spark GraphFrame is available as the 'graphframe' property of this object.
Examples
--------
>>> viewers = tc.frame.create([['fred', 0],
... ['wilma', 0],
... ['pebbles', 1],
... ['betty', 0],
... ['barney', 0],
... ['bamm bamm', 1]],
... schema= [('id', str), ('kids', int)])
>>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']
>>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])
>>> vertices = viewers.copy()
>>> vertices.append(movies)
>>> vertices.inspect(20)
[##] id kids
============================
[0] fred 0
[1] wilma 0
[2] pebbles 1
[3] betty 0
[4] barney 0
[5] bamm bamm 1
[6] Croods None
[7] Jurassic Park None
[8] 2001 None
[9] Ice Age None
[10] Land Before Time None
>>> edges = tc.frame.create([['fred','Croods',5],
... ['fred','Jurassic Park',5],
... ['fred','2001',2],
... ['fred','Ice Age',4],
... ['wilma','Jurassic Park',3],
... ['wilma','2001',5],
... ['wilma','Ice Age',4],
... ['pebbles','Croods',4],
... ['pebbles','Land Before Time',3],
... ['pebbles','Ice Age',5],
... ['betty','Croods',5],
... ['betty','Jurassic Park',3],
... ['betty','Land Before Time',4],
... ['betty','Ice Age',3],
... ['barney','Croods',5],
... ['barney','Jurassic Park',5],
... ['barney','Land Before Time',3],
... ['barney','Ice Age',5],
... ['bamm bamm','Croods',5],
... ['bamm bamm','Land Before Time',3]],
... schema = ['src', 'dst', 'rating'])
>>> edges.inspect(20)
[##] src dst rating
=========================================
[0] fred Croods 5
[1] fred Jurassic Park 5
[2] fred 2001 2
[3] fred Ice Age 4
[4] wilma Jurassic Park 3
[5] wilma 2001 5
[6] wilma Ice Age 4
[7] pebbles Croods 4
[8] pebbles Land Before Time 3
[9] pebbles Ice Age 5
[10] betty Croods 5
[11] betty Jurassic Park 3
[12] betty Land Before Time 4
[13] betty Ice Age 3
[14] barney Croods 5
[15] barney Jurassic Park 5
[16] barney Land Before Time 3
[17] barney Ice Age 5
[18] bamm bamm Croods 5
[19] bamm bamm Land Before Time 3
>>> graph = tc.graph.create(vertices, edges)
>>> graph
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
>>> graph.save("sandbox/old_movie_graph")
>>> restored = tc.load("sandbox/old_movie_graph")
>>> restored
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
"""
def __init__(self, tc, source_or_vertices_frame, edges_frame=None):
self._tc = tc
self._scala = None
# (note that the Scala code will validate appropriate frame schemas)
if isinstance(source_or_vertices_frame, Frame):
# Python Vertices and Edges Frames
vertices_frame = source_or_vertices_frame
require_type(Frame,
edges_frame,
'edges_frame',
"Providing a vertices frame requires also providing an edges frame")
self._scala = self._create_scala_graph_from_scala_frames(self._tc,
vertices_frame._scala,
edges_frame._scala)
else:
source = source_or_vertices_frame
require_type(None,
edges_frame,
'edges_frame',
'If edges_frames is provided, then a valid vertex frame must be provided as the first arg, instead of type %s' % type(source))
if self._is_scala_graph(source):
# Scala Graph
self._scala = source
elif isinstance(source, GraphFrame):
# python GraphFrame
scala_graphframe = source._jvm_graph
self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, scala_graphframe)
elif self._is_scala_graphframe(source):
# scala GraphFrame
self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, source)
else:
raise TypeError("Cannot create from source type %s" % type(source))
def __repr__(self):
return self._scala.toString()
@staticmethod
def _get_scala_graph_class(tc):
"""Gets reference to the sparktk scala Graph class"""
return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph
@staticmethod
def _get_scala_graphframe_class(tc):
"""Gets reference to the scala GraphFrame class"""
return tc.sc._jvm.org.graphframes.GraphFrame
@staticmethod
def _create_scala_graph_from_scala_graphframe(tc, scala_graphframe):
try:
return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph(scala_graphframe)
except (Py4JJavaError, IllegalArgumentException) as e:
raise ValueError(str(e))
@staticmethod
def _create_scala_graph_from_scala_frames(tc, scala_vertices_frame, scala_edges_frame):
try:
return tc.sc._jvm.org.trustedanalytics.sparktk.graph.internal.constructors.FromFrames.create(scala_vertices_frame, scala_edges_frame)
except (Py4JJavaError, IllegalArgumentException) as e:
raise ValueError(str(e))
# this one for the Loader:
@staticmethod
def _from_scala(tc, scala_graph):
"""creates a python Frame for the given scala Frame"""
return Graph(tc, scala_graph)
def _is_scala_graph(self, item):
return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graph_class(self._tc))
def _is_scala_graphframe(self, item):
return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graphframe_class(self._tc))
##########################################################################
# API
##########################################################################
@property
def graphframe(self):
"""The underlying graphframe object which exposes several methods and properties"""
return _from_java_gf(self._scala.graphFrame(), self._tc.sql_context)
def create_vertices_frame(self):
"""Creates a frame representing the vertices stored in this graph"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.graphFrame().vertices())
def create_edges_frame(self):
"""Creates a frame representing the edges stored in this graph"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.graphFrame().edges())
# Graph Operations
from sparktk.graph.ops.connected_components import connected_components
from sparktk.graph.ops.clustering_coefficient import clustering_coefficient
from sparktk.graph.ops.degrees import degrees
from sparktk.graph.ops.export_to_orientdb import export_to_orientdb
from sparktk.graph.ops.global_clustering_coefficient import global_clustering_coefficient
from sparktk.graph.ops.label_propagation import label_propagation
from sparktk.graph.ops.loopy_belief_propagation import loopy_belief_propagation
from sparktk.graph.ops.page_rank import page_rank
from sparktk.graph.ops.save import save
from sparktk.graph.ops.triangle_count import triangle_count
from sparktk.graph.ops.vertex_count import vertex_count
from sparktk.graph.ops.weighted_degrees import weighted_degrees
def load(path, tc=TkContext.implicit):
"""load Graph from given path"""
TkContext.validate(tc)
return tc.load(path, Graph)
Functions
def create(
source_or_vertices_frame, edges_frame=None, tc=<class 'sparktk.arguments.implicit'>)
Create a sparktk Graph from two sparktk Frames (or some other source)
source_or_vertices_frame: | a graph source or a vertices frame Valid sources include: a python and spark GraphFrame, or a scala Graph Otherwise if a vertices frame is provided, then the edges_frame arg must also be supplied. A vertices frame defines the vertices for the graph and must have a schema with a column named "id" which provides unique vertex ID. All other columns are treated as vertex properties. If a column is also found named "vertex_type", it will be used as a special label to denote the type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a specific vertex type. |
edges_frame | (valid only if the source_or_vertices_frame arg is a vertices Frame): | An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic (such as a graph DB) which expects a specific edge type. |
def create(source_or_vertices_frame, edges_frame=None, tc=TkContext.implicit):
"""
Create a sparktk Graph from two sparktk Frames (or some other source)
Parameters
----------
:param source_or_vertices_frame: a graph source or a vertices frame
Valid sources include: a python and spark GraphFrame, or a scala Graph
Otherwise if a vertices frame is provided, then the edges_frame arg must also be supplied.
A vertices frame defines the vertices for the graph and must have a schema with a column
named "id" which provides unique vertex ID. All other columns are treated as vertex properties.
If a column is also found named "vertex_type", it will be used as a special label to denote the
type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a
specific vertex type.
:param edges_frame: (valid only if the source_or_vertices_frame arg is a vertices Frame) An edge frame defines the
edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids
of the edge. All other columns are treated as edge properties. If a column is also found named
"edge_type", it will be used as a special label to denote the type of edge, for example, when
interfacing with logic (such as a graph DB) which expects a specific edge type.
"""
TkContext.validate(tc)
from sparktk.graph.graph import Graph
return Graph(tc, source_or_vertices_frame, edges_frame)
def import_orientdb_graph(
db_url, user_name, password, root_password, tc=<class 'sparktk.arguments.implicit'>)
Import graph from OrientDB to spark-tk as spark-tk graph (Spark GraphFrame)
Parameters
----------
:param:(str) db_url: OrientDB URI
:param:(str) user_name: the database username
:param:(str) password: the database password
:param :(str)root_password: OrientDB server password
Example
-------
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
... ("b", "Bob", 36,"M"),
... ("c", "Charlie", 30,"M"),
... ("d", "David", 29,"M"),
... ("e", "Esther", 32,"F"),
... ("f", "Fanny", 36,"F"),
... ], ["id", "name", "age","gender"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> sparktk_graph = tc.graph.create(v,e)
>>> db = "test_db"
>>> sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")
>>> imported_gf = tc.graph.import_orientdb_graph(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password")
>>> imported_gf.graphframe.vertices.show()
+-------+------+---+---+ | name|gender| id|age| +-------+------+---+---+ | Bob| M| b| 36| | David| M| d| 29| |Charlie| M| c| 30| | Alice| F| a| 34| | Esther| F| e| 32| | Fanny| F| f| 36| +-------+------+---+---+
>>> imported_gf.graphframe.edges.show()
+---+------------+---+ |dst|relationship|src| +---+------------+---+ | f| follow| e| | b| follow| c| | c| follow| b| | c| follow| f| | b| friend| a| | a| friend| d| | d| friend| e| | e| friend| a| +---+------------+---+
def import_orientdb_graph(db_url, user_name, password, root_password,tc=TkContext.implicit):
"""
Import graph from OrientDB to spark-tk as spark-tk graph (Spark GraphFrame)
Parameters
----------
:param:(str) db_url: OrientDB URI
:param:(str) user_name: the database username
:param:(str) password: the database password
:param :(str)root_password: OrientDB server password
Example
-------
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
... ("b", "Bob", 36,"M"),
... ("c", "Charlie", 30,"M"),
... ("d", "David", 29,"M"),
... ("e", "Esther", 32,"F"),
... ("f", "Fanny", 36,"F"),
... ], ["id", "name", "age","gender"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> sparktk_graph = tc.graph.create(v,e)
>>> db = "test_db"
>>> sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")
>>> imported_gf = tc.graph.import_orientdb_graph(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password")
>>> imported_gf.graphframe.vertices.show()
+-------+------+---+---+
| name|gender| id|age|
+-------+------+---+---+
| Bob| M| b| 36|
| David| M| d| 29|
|Charlie| M| c| 30|
| Alice| F| a| 34|
| Esther| F| e| 32|
| Fanny| F| f| 36|
+-------+------+---+---+
>>> imported_gf.graphframe.edges.show()
+---+------------+---+
|dst|relationship|src|
+---+------------+---+
| f| follow| e|
| b| follow| c|
| c| follow| b|
| c| follow| f|
| b| friend| a|
| a| friend| d|
| d| friend| e|
| e| friend| a|
+---+------------+---+
"""
TkContext.validate(tc)
scala_graph = tc.sc._jvm.org.trustedanalytics.sparktk.graph.internal.constructors.fromorientdb.ImportFromOrientdb.importOrientdbGraph(tc.jutils.get_scala_sc(), db_url,user_name,password,root_password)
from sparktk.graph.graph import Graph
return Graph(tc, scala_graph)
def load(
path, tc=<class 'sparktk.arguments.implicit'>)
load Graph from given path
def load(path, tc=TkContext.implicit):
"""load Graph from given path"""
TkContext.validate(tc)
return tc.load(path, Graph)
Classes
class Graph
sparktk Graph
Represents a graph with a frame defining vertices and another frame defining edges. It is implemented as a very thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods are available.
A vertices frame defines the vertices for the graph and must have a schema with a column named "id" which provides unique vertex ID. All other columns are treated as vertex properties. If a column is also found named "vertex_type", it will be used as a special label to denote the type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a specific vertex type.
An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic (such as a graph DB) which expects a specific edge type.
Unlike sparktk Frames, this Graph is immutable (it cannot be changed). The vertices and edges may be extracted as sparktk Frame objects, but those frames then take on a life of their own apart from this graph object. Those frames could be transformed and used to build a new graph object if necessary.
The underlying spark GraphFrame is available as the 'graphframe' property of this object.
>>> viewers = tc.frame.create([['fred', 0],
... ['wilma', 0],
... ['pebbles', 1],
... ['betty', 0],
... ['barney', 0],
... ['bamm bamm', 1]],
... schema= [('id', str), ('kids', int)])
>>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']
>>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])
>>> vertices = viewers.copy()
>>> vertices.append(movies)
>>> vertices.inspect(20)
[##] id kids
============================
[0] fred 0
[1] wilma 0
[2] pebbles 1
[3] betty 0
[4] barney 0
[5] bamm bamm 1
[6] Croods None
[7] Jurassic Park None
[8] 2001 None
[9] Ice Age None
[10] Land Before Time None
>>> edges = tc.frame.create([['fred','Croods',5],
... ['fred','Jurassic Park',5],
... ['fred','2001',2],
... ['fred','Ice Age',4],
... ['wilma','Jurassic Park',3],
... ['wilma','2001',5],
... ['wilma','Ice Age',4],
... ['pebbles','Croods',4],
... ['pebbles','Land Before Time',3],
... ['pebbles','Ice Age',5],
... ['betty','Croods',5],
... ['betty','Jurassic Park',3],
... ['betty','Land Before Time',4],
... ['betty','Ice Age',3],
... ['barney','Croods',5],
... ['barney','Jurassic Park',5],
... ['barney','Land Before Time',3],
... ['barney','Ice Age',5],
... ['bamm bamm','Croods',5],
... ['bamm bamm','Land Before Time',3]],
... schema = ['src', 'dst', 'rating'])
>>> edges.inspect(20)
[##] src dst rating
=========================================
[0] fred Croods 5
[1] fred Jurassic Park 5
[2] fred 2001 2
[3] fred Ice Age 4
[4] wilma Jurassic Park 3
[5] wilma 2001 5
[6] wilma Ice Age 4
[7] pebbles Croods 4
[8] pebbles Land Before Time 3
[9] pebbles Ice Age 5
[10] betty Croods 5
[11] betty Jurassic Park 3
[12] betty Land Before Time 4
[13] betty Ice Age 3
[14] barney Croods 5
[15] barney Jurassic Park 5
[16] barney Land Before Time 3
[17] barney Ice Age 5
[18] bamm bamm Croods 5
[19] bamm bamm Land Before Time 3
>>> graph = tc.graph.create(vertices, edges)
>>> graph
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
>>> graph.save("sandbox/old_movie_graph")
>>> restored = tc.load("sandbox/old_movie_graph")
>>> restored
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
class Graph(object):
"""
sparktk Graph
Represents a graph with a frame defining vertices and another frame defining edges. It is implemented as a very
thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods
are available.
A vertices frame defines the vertices for the graph and must have a schema with a column
named "id" which provides unique vertex ID. All other columns are treated as vertex properties.
If a column is also found named "vertex_type", it will be used as a special label to denote the type
of vertex, for example, when interfacing with logic (such as a graph DB) which expects a
specific vertex type.
An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the
vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named
"edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic
(such as a graph DB) which expects a specific edge type.
Unlike sparktk Frames, this Graph is immutable (it cannot be changed). The vertices and edges may be extracted as
sparktk Frame objects, but those frames then take on a life of their own apart from this graph object. Those frames
could be transformed and used to build a new graph object if necessary.
The underlying spark GraphFrame is available as the 'graphframe' property of this object.
Examples
--------
>>> viewers = tc.frame.create([['fred', 0],
... ['wilma', 0],
... ['pebbles', 1],
... ['betty', 0],
... ['barney', 0],
... ['bamm bamm', 1]],
... schema= [('id', str), ('kids', int)])
>>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']
>>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])
>>> vertices = viewers.copy()
>>> vertices.append(movies)
>>> vertices.inspect(20)
[##] id kids
============================
[0] fred 0
[1] wilma 0
[2] pebbles 1
[3] betty 0
[4] barney 0
[5] bamm bamm 1
[6] Croods None
[7] Jurassic Park None
[8] 2001 None
[9] Ice Age None
[10] Land Before Time None
>>> edges = tc.frame.create([['fred','Croods',5],
... ['fred','Jurassic Park',5],
... ['fred','2001',2],
... ['fred','Ice Age',4],
... ['wilma','Jurassic Park',3],
... ['wilma','2001',5],
... ['wilma','Ice Age',4],
... ['pebbles','Croods',4],
... ['pebbles','Land Before Time',3],
... ['pebbles','Ice Age',5],
... ['betty','Croods',5],
... ['betty','Jurassic Park',3],
... ['betty','Land Before Time',4],
... ['betty','Ice Age',3],
... ['barney','Croods',5],
... ['barney','Jurassic Park',5],
... ['barney','Land Before Time',3],
... ['barney','Ice Age',5],
... ['bamm bamm','Croods',5],
... ['bamm bamm','Land Before Time',3]],
... schema = ['src', 'dst', 'rating'])
>>> edges.inspect(20)
[##] src dst rating
=========================================
[0] fred Croods 5
[1] fred Jurassic Park 5
[2] fred 2001 2
[3] fred Ice Age 4
[4] wilma Jurassic Park 3
[5] wilma 2001 5
[6] wilma Ice Age 4
[7] pebbles Croods 4
[8] pebbles Land Before Time 3
[9] pebbles Ice Age 5
[10] betty Croods 5
[11] betty Jurassic Park 3
[12] betty Land Before Time 4
[13] betty Ice Age 3
[14] barney Croods 5
[15] barney Jurassic Park 5
[16] barney Land Before Time 3
[17] barney Ice Age 5
[18] bamm bamm Croods 5
[19] bamm bamm Land Before Time 3
>>> graph = tc.graph.create(vertices, edges)
>>> graph
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
>>> graph.save("sandbox/old_movie_graph")
>>> restored = tc.load("sandbox/old_movie_graph")
>>> restored
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
"""
def __init__(self, tc, source_or_vertices_frame, edges_frame=None):
self._tc = tc
self._scala = None
# (note that the Scala code will validate appropriate frame schemas)
if isinstance(source_or_vertices_frame, Frame):
# Python Vertices and Edges Frames
vertices_frame = source_or_vertices_frame
require_type(Frame,
edges_frame,
'edges_frame',
"Providing a vertices frame requires also providing an edges frame")
self._scala = self._create_scala_graph_from_scala_frames(self._tc,
vertices_frame._scala,
edges_frame._scala)
else:
source = source_or_vertices_frame
require_type(None,
edges_frame,
'edges_frame',
'If edges_frames is provided, then a valid vertex frame must be provided as the first arg, instead of type %s' % type(source))
if self._is_scala_graph(source):
# Scala Graph
self._scala = source
elif isinstance(source, GraphFrame):
# python GraphFrame
scala_graphframe = source._jvm_graph
self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, scala_graphframe)
elif self._is_scala_graphframe(source):
# scala GraphFrame
self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, source)
else:
raise TypeError("Cannot create from source type %s" % type(source))
def __repr__(self):
return self._scala.toString()
@staticmethod
def _get_scala_graph_class(tc):
"""Gets reference to the sparktk scala Graph class"""
return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph
@staticmethod
def _get_scala_graphframe_class(tc):
"""Gets reference to the scala GraphFrame class"""
return tc.sc._jvm.org.graphframes.GraphFrame
@staticmethod
def _create_scala_graph_from_scala_graphframe(tc, scala_graphframe):
try:
return tc.sc._jvm.org.trustedanalytics.sparktk.graph.Graph(scala_graphframe)
except (Py4JJavaError, IllegalArgumentException) as e:
raise ValueError(str(e))
@staticmethod
def _create_scala_graph_from_scala_frames(tc, scala_vertices_frame, scala_edges_frame):
try:
return tc.sc._jvm.org.trustedanalytics.sparktk.graph.internal.constructors.FromFrames.create(scala_vertices_frame, scala_edges_frame)
except (Py4JJavaError, IllegalArgumentException) as e:
raise ValueError(str(e))
# this one for the Loader:
@staticmethod
def _from_scala(tc, scala_graph):
"""creates a python Frame for the given scala Frame"""
return Graph(tc, scala_graph)
def _is_scala_graph(self, item):
return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graph_class(self._tc))
def _is_scala_graphframe(self, item):
return self._tc._jutils.is_jvm_instance_of(item, self._get_scala_graphframe_class(self._tc))
##########################################################################
# API
##########################################################################
@property
def graphframe(self):
"""The underlying graphframe object which exposes several methods and properties"""
return _from_java_gf(self._scala.graphFrame(), self._tc.sql_context)
def create_vertices_frame(self):
"""Creates a frame representing the vertices stored in this graph"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.graphFrame().vertices())
def create_edges_frame(self):
"""Creates a frame representing the edges stored in this graph"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.graphFrame().edges())
# Graph Operations
from sparktk.graph.ops.connected_components import connected_components
from sparktk.graph.ops.clustering_coefficient import clustering_coefficient
from sparktk.graph.ops.degrees import degrees
from sparktk.graph.ops.export_to_orientdb import export_to_orientdb
from sparktk.graph.ops.global_clustering_coefficient import global_clustering_coefficient
from sparktk.graph.ops.label_propagation import label_propagation
from sparktk.graph.ops.loopy_belief_propagation import loopy_belief_propagation
from sparktk.graph.ops.page_rank import page_rank
from sparktk.graph.ops.save import save
from sparktk.graph.ops.triangle_count import triangle_count
from sparktk.graph.ops.vertex_count import vertex_count
from sparktk.graph.ops.weighted_degrees import weighted_degrees
Ancestors (in MRO)
- Graph
- __builtin__.object
Instance variables
var graphframe
The underlying graphframe object which exposes several methods and properties
Methods
def __init__(
self, tc, source_or_vertices_frame, edges_frame=None)
def __init__(self, tc, source_or_vertices_frame, edges_frame=None):
self._tc = tc
self._scala = None
# (note that the Scala code will validate appropriate frame schemas)
if isinstance(source_or_vertices_frame, Frame):
# Python Vertices and Edges Frames
vertices_frame = source_or_vertices_frame
require_type(Frame,
edges_frame,
'edges_frame',
"Providing a vertices frame requires also providing an edges frame")
self._scala = self._create_scala_graph_from_scala_frames(self._tc,
vertices_frame._scala,
edges_frame._scala)
else:
source = source_or_vertices_frame
require_type(None,
edges_frame,
'edges_frame',
'If edges_frames is provided, then a valid vertex frame must be provided as the first arg, instead of type %s' % type(source))
if self._is_scala_graph(source):
# Scala Graph
self._scala = source
elif isinstance(source, GraphFrame):
# python GraphFrame
scala_graphframe = source._jvm_graph
self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, scala_graphframe)
elif self._is_scala_graphframe(source):
# scala GraphFrame
self._scala = self._create_scala_graph_from_scala_graphframe(self._tc, source)
else:
raise TypeError("Cannot create from source type %s" % type(source))
def clustering_coefficient(
self)
The clustering coefficient of a vertex provides a measure of how tightly clustered that vertex's neighborhood is.
Formally:
.. math::
cc(v) = rac{ \| { (u,v,w) \in V^3: \ {u,v}, {u, w}, {v,w } \in E } \| }{\| { (u,v,w) \in V^3: \ {v, u }, {v, w} \in E } \|}
For further reading on clustering coefficients, see http://en.wikipedia.org/wiki/Clustering_coefficient.
This method returns a frame with the vertex id associated with it's local clustering coefficient
Returns | (Frame): | Frame containing the vertex id's and their clustering coefficient |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.clustering_coefficient()
>>> result.inspect()
[#] id clustering_coefficient
===============================
[0] 1 0.333333333333
[1] 2 1.0
[2] 3 1.0
[3] 4 0.0
[4] 5 0.0
def clustering_coefficient(self):
"""
The clustering coefficient of a vertex provides a measure of how
tightly clustered that vertex's neighborhood is.
Formally:
.. math::
cc(v) = \frac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in \
E \} \| }{\| \{ (u,v,w) \in V^3: \ \{v, u \}, \{v, w\} \in E \} \|}
For further reading on clustering
coefficients, see http://en.wikipedia.org/wiki/Clustering_coefficient.
This method returns a frame with the vertex id associated with it's local
clustering coefficient
Parameters
----------
:return: (Frame) Frame containing the vertex id's and their clustering coefficient
Examples
--------
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.clustering_coefficient()
>>> result.inspect()
[#] id clustering_coefficient
===============================
[0] 1 0.333333333333
[1] 2 1.0
[2] 3 1.0
[3] 4 0.0
[4] 5 0.0
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.clusteringCoefficient())
def connected_components(
self)
Connected components determines groups all the vertices in a particular graph by whether or not there is path between these vertices. This method returns a frame with the vertices and their corresponding component
Returns | (Frame): | Frame containing the vertex id's and their components |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.connected_components()
>>> result.inspect()
[#] id component
==================
[0] 1 1
[1] 2 1
[2] 3 1
[3] 4 4
[4] 5 4
def connected_components(self):
"""
Connected components determines groups all the vertices in a particular graph
by whether or not there is path between these vertices. This method returns
a frame with the vertices and their corresponding component
Parameters
----------
:return: (Frame) Frame containing the vertex id's and their components
Examples
--------
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.connected_components()
>>> result.inspect()
[#] id component
==================
[0] 1 1
[1] 2 1
[2] 3 1
[3] 4 4
[4] 5 4
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.connectedComponents())
def create_edges_frame(
self)
Creates a frame representing the edges stored in this graph
def create_edges_frame(self):
"""Creates a frame representing the edges stored in this graph"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.graphFrame().edges())
def create_vertices_frame(
self)
Creates a frame representing the vertices stored in this graph
def create_vertices_frame(self):
"""Creates a frame representing the vertices stored in this graph"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.graphFrame().vertices())
def degrees(
self, degree_option='undirected')
Degree Calculation
A fundamental quantity in graph analysis is the degree of a vertex: The degree of a vertex is the number of edges adjacent to it.
For a directed edge relation, a vertex has both an out-degree (the number of edges leaving the vertex) and an in-degree (the number of edges entering the vertex).
degree_option | (String): | Either in, out or undirected. String describing the direction of edges |
Returns | (Frame): | Frame containing the vertex id's an their weights |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.degrees(degree_option="out")
>>> result.inspect()
[#] id degree
===============
[0] 1 3
[1] 2 1
[2] 3 0
[3] 4 1
[4] 5 0
>>> result = graph.degrees(degree_option="in")
>>> result.inspect()
[#] id degree
===============
[0] 1 0
[1] 2 1
[2] 3 2
[3] 4 1
[4] 5 1
def degrees(self, degree_option='undirected'):
"""
**Degree Calculation**
A fundamental quantity in graph analysis is the degree of a vertex:
The degree of a vertex is the number of edges adjacent to it.
For a directed edge relation, a vertex has both an out-degree (the number of
edges leaving the vertex) and an in-degree (the number of edges entering the
vertex).
Parameters
----------
:param degree_option: (String) Either in, out or undirected. String describing the direction of edges
:return: (Frame) Frame containing the vertex id's an their weights
Examples
--------
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.degrees(degree_option="out")
>>> result.inspect()
[#] id degree
===============
[0] 1 3
[1] 2 1
[2] 3 0
[3] 4 1
[4] 5 0
>>> result = graph.degrees(degree_option="in")
>>> result.inspect()
[#] id degree
===============
[0] 1 0
[1] 2 1
[2] 3 2
[3] 4 1
[4] 5 1
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.degree(degree_option))
def export_to_orientdb(
self, db_url, user_name, password, root_password, vertex_type_column_name=None, edge_type_column_name=None, batch_size=1000)
Export Spark-tk Graph (GraphFrame) to OrientDB API creates OrientDB database with the given database name, URL and credentials. It exports the vertex and edge dataframes schema OrientDB based on the given vertex_type_column_name and edge_type_column_name. If any of them was None it exports it to the base type class.
:param:(str) db_url: OrientDB URI :param:(str) user_name: the database username :param:(str) password: the database password :param:(str) root_password: OrientDB server password :param:(Optional(str)) vertex_type_column_name: column name from the vertex data frame specified to be the vertex type :param:(Optional(str)) edge_type_column_name: column name from the edge data frame specified to be the edge type :param:(int) batch_size: batch size for graph ETL to OrientDB database
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
... ("b", "Bob", 36,"M"),
... ("c", "Charlie", 30,"M"),
... ("d", "David", 29,"M"),
... ("e", "Esther", 32,"F"),
... ("f", "Fanny", 36,"F"),
... ], ["id", "name", "age","gender"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> sparktk_graph = tc.graph.create(v,e)
>>> db = "test_db"
>>> result = sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")
>>> result
db_uri = remote:hostname:2424/test_db
edge_types = {u'follow': 4L, u'friend': 4L}
exported_edges_summary = {u'Total Exported Edges Count': 8L, u'Failure Count': 0L}
exported_vertices_summary = {u'Total Exported Vertices Count': 6L, u'Failure Count': 0L}
vertex_types = {u'M': 3L, u'F': 3L}
def export_to_orientdb(self, db_url, user_name, password, root_password,vertex_type_column_name=None, edge_type_column_name=None,batch_size=1000):
"""
Export Spark-tk Graph (GraphFrame) to OrientDB API creates OrientDB database with the given database name, URL
and credentials. It exports the vertex and edge dataframes schema OrientDB based on the given vertex_type_column_name
and edge_type_column_name. If any of them was None it exports it to the base type class.
Parameters
----------
:param:(str) db_url: OrientDB URI
:param:(str) user_name: the database username
:param:(str) password: the database password
:param:(str) root_password: OrientDB server password
:param:(Optional(str)) vertex_type_column_name: column name from the vertex data frame specified to be the vertex type
:param:(Optional(str)) edge_type_column_name: column name from the edge data frame specified to be the edge type
:param:(int) batch_size: batch size for graph ETL to OrientDB database
Example
-------
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
... ("b", "Bob", 36,"M"),
... ("c", "Charlie", 30,"M"),
... ("d", "David", 29,"M"),
... ("e", "Esther", 32,"F"),
... ("f", "Fanny", 36,"F"),
... ], ["id", "name", "age","gender"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> sparktk_graph = tc.graph.create(v,e)
>>> db = "test_db"
>>> result = sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")
>>> result
db_uri = remote:hostname:2424/test_db
edge_types = {u'follow': 4L, u'friend': 4L}
exported_edges_summary = {u'Total Exported Edges Count': 8L, u'Failure Count': 0L}
exported_vertices_summary = {u'Total Exported Vertices Count': 6L, u'Failure Count': 0L}
vertex_types = {u'M': 3L, u'F': 3L}
"""
return ExportToOrientdbReturn(self._tc,self._scala.exportToOrientdb(db_url, user_name, password, root_password,self._tc._jutils.convert.to_scala_option(vertex_type_column_name),self._tc._jutils.convert.to_scala_option(edge_type_column_name), batch_size))
def global_clustering_coefficient(
self)
The clustering coefficient of a graph provides a measure of how tightly clustered an undirected graph is.
More formally:
.. math::
cc(G) = rac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in E \} \| }{\| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\} \in E \} \|}
Returns | (Double): | The global clustering coefficient of the graph |
The clustering coefficient on a graph with some triangles will be
greater than 0
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_schema = [('src', int), ('dst', int)]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> graph.global_clustering_coefficient()
0.5
The clustering coefficient on a graph with no triangles (a tree) is
0
>>> vertex_rows_star = [ [1], [2], [3], [4]]
>>> vertex_frame_star = tc.frame.create(vertex_rows_star, vertex_schema)
>>> edge_rows_star = [ [1, 2], [1, 3], [1, 4]]
>>> edge_frame_star = tc.frame.create(edge_rows_star, edge_schema)
>>> graph_star = tc.graph.create(vertex_frame_star, edge_frame_star)
>>> graph_star.global_clustering_coefficient()
0.0
def global_clustering_coefficient(self):
"""
The clustering coefficient of a graph provides a measure of how tightly
clustered an undirected graph is.
More formally:
.. math::
cc(G) = \frac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in \
E \} \| }{\| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\} \in E \} \|}
Parameters
----------
:return: (Double) The global clustering coefficient of the graph
Examples
--------
The clustering coefficient on a graph with some triangles will be
greater than 0
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_schema = [('src', int), ('dst', int)]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> graph.global_clustering_coefficient()
0.5
The clustering coefficient on a graph with no triangles (a tree) is
0
>>> vertex_rows_star = [ [1], [2], [3], [4]]
>>> vertex_frame_star = tc.frame.create(vertex_rows_star, vertex_schema)
>>> edge_rows_star = [ [1, 2], [1, 3], [1, 4]]
>>> edge_frame_star = tc.frame.create(edge_rows_star, edge_schema)
>>> graph_star = tc.graph.create(vertex_frame_star, edge_frame_star)
>>> graph_star.global_clustering_coefficient()
0.0
"""
return self._scala.globalClusteringCoefficient()
def label_propagation(
self, max_iterations)
Assigns label based off of proximity to different vertices. The labels are initially 1 unique label per vertex (the vertex id), and as the algorithm runs some of these get erased
Note this algorithm is neither guaranteed to converge, nor guaranteed to converge to the correct value.
This calls graph frames label propagation which can be found at
http://graphframes.github.io/api/scala/index.html#org.graphframes.lib.LabelPropagation
Returns | (Frame): | Frame containing the vertex id's and the community they are a member of |
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.label_propagation(10)
>>> result.inspect()
[#] id label
==============
[0] 1 1
[1] 2 2
[2] 3 2
[3] 4 2
[4] 5 1
def label_propagation(self, max_iterations):
"""
Parameters
----------
Assigns label based off of proximity to different vertices. The labels
are initially 1 unique label per vertex (the vertex id), and as the
algorithm runs some of these get erased
Note this algorithm is neither guaranteed to converge, nor guaranteed to
converge to the correct value.
This calls graph frames label propagation which can be found at
http://graphframes.github.io/api/scala/index.html#org.graphframes.lib.LabelPropagation
:return: (Frame) Frame containing the vertex id's and the community they are a member of
Examples
--------
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.label_propagation(10)
>>> result.inspect()
[#] id label
==============
[0] 1 1
[1] 2 2
[2] 3 2
[3] 4 2
[4] 5 1
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.labelPropagation(max_iterations))
def loopy_belief_propagation(
self, prior, edge_weight, max_iterations=10)
Performs loopy belief propagation on a graph representing a Potts model. This optimizes based off of user provided priors.
prior | (String): | The name of the column of space delimited string of floats representing the prior distribution on a vertex |
edge_weight | (String): | The name of the column of weight value on edges |
max_iterations: | The number of iterations to run for |
>>> vertex_schema = [('id', int), ('label', float), ("prior_val", str), ("was_labeled", int)]
>>> vertex_rows = [ [1, 1, "0.7 0.3", 1], [2, 1, "0.7 0.3", 1], [3, 5, "0.7 0.3", 0], [4, 5, "0.7 0.3", 0], [5, 5, "0.7 0.3", 1] ]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> vertex_frame.inspect()
[#] id label prior_val was_labeled
======================================
[0] 1 1.0 0.7 0.3 1
[1] 2 1.0 0.7 0.3 1
[2] 3 5.0 0.7 0.3 0
[3] 4 5.0 0.7 0.3 0
[4] 5 5.0 0.7 0.3 1
>>> result = graph.loopy_belief_propagation("prior_val", "weight", 2)
>>> result.inspect()
[#] id label prior_val was_labeled
======================================
[0] 1 1.0 0.7 0.3 1
[1] 2 1.0 0.7 0.3 1
[2] 3 5.0 0.7 0.3 0
[3] 4 5.0 0.7 0.3 0
[4] 5 5.0 0.7 0.3 1
<BLANKLINE>
[#] posterior
==============================================
[0] [0.9883347610773112,0.011665238922688819]
[1] [0.9743014865548763,0.025698513445123698]
[2] [0.9396772870897875,0.06032271291021254]
[3] [0.9319529856190276,0.06804701438097235]
[4] [0.8506957305238876,0.1493042694761125]
def loopy_belief_propagation(self, prior, edge_weight, max_iterations=10):
"""
Performs loopy belief propagation on a graph representing a Potts model. This optimizes based off of
user provided priors.
Parameters
----------
:param prior: (String) The name of the column of space delimited string of floats representing the prior distribution on a vertex
:param edge_weight: (String) The name of the column of weight value on edges
:param max_iterations: The number of iterations to run for
Examples
--------
>>> vertex_schema = [('id', int), ('label', float), ("prior_val", str), ("was_labeled", int)]
>>> vertex_rows = [ [1, 1, "0.7 0.3", 1], [2, 1, "0.7 0.3", 1], [3, 5, "0.7 0.3", 0], [4, 5, "0.7 0.3", 0], [5, 5, "0.7 0.3", 1] ]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> vertex_frame.inspect()
[#] id label prior_val was_labeled
======================================
[0] 1 1.0 0.7 0.3 1
[1] 2 1.0 0.7 0.3 1
[2] 3 5.0 0.7 0.3 0
[3] 4 5.0 0.7 0.3 0
[4] 5 5.0 0.7 0.3 1
>>> result = graph.loopy_belief_propagation("prior_val", "weight", 2)
>>> result.inspect()
[#] id label prior_val was_labeled
======================================
[0] 1 1.0 0.7 0.3 1
[1] 2 1.0 0.7 0.3 1
[2] 3 5.0 0.7 0.3 0
[3] 4 5.0 0.7 0.3 0
[4] 5 5.0 0.7 0.3 1
[#] posterior
==============================================
[0] [0.9883347610773112,0.011665238922688819]
[1] [0.9743014865548763,0.025698513445123698]
[2] [0.9396772870897875,0.06032271291021254]
[3] [0.9319529856190276,0.06804701438097235]
[4] [0.8506957305238876,0.1493042694761125]
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.loopyBeliefPropagation(prior, edge_weight, max_iterations))
def page_rank(
self, convergence_tolerance=None, reset_probability=None, max_iterations=None)
Page Rank
Page Rank is a popular statistic that ranks vertices based off of connectivity in the global graph
Exactly 1 of convergence_tolerance and max_iterations must be set (termination criteria)
:convergence_tolerance: (Float) If the difference between successive iterations is less than this, the algorithm terminates. Mutually exclusive with max_iterations :reset_probability: (Float) Value for the reset probabiity in the page rank algorithm :max_iterations: (Int) Maximum number of iterations the page rank should run before terminating. Mutually exclusive with convergence_tolerance
Returns | (Frame): | Frame containing the vertex id's and their page rank |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.page_rank(max_iterations=20)
>>> result.inspect()
[#] id pagerank
=================
[0] 1 0.15
[1] 2 0.1925
[2] 3 0.356125
[3] 4 0.1925
[4] 5 0.313625
def page_rank(self, convergence_tolerance=None, reset_probability=None, max_iterations=None):
"""
**Page Rank**
Page Rank is a popular statistic that ranks vertices based off of
connectivity in the global graph
Exactly 1 of convergence_tolerance and max_iterations must be set (termination criteria)
Parameters
----------
:convergence_tolerance: (Float) If the difference between successive iterations is less than this, the algorithm terminates. Mutually exclusive with max_iterations
:reset_probability: (Float) Value for the reset probabiity in the page rank algorithm
:max_iterations: (Int) Maximum number of iterations the page rank should run before terminating. Mutually exclusive with convergence_tolerance
:return: (Frame) Frame containing the vertex id's and their page rank
Examples
--------
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.page_rank(max_iterations=20)
>>> result.inspect()
[#] id pagerank
=================
[0] 1 0.15
[1] 2 0.1925
[2] 3 0.356125
[3] 4 0.1925
[4] 5 0.313625
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.pageRank(
self._tc.jutils.convert.to_scala_option(max_iterations),
self._tc.jutils.convert.to_scala_option(reset_probability),
self._tc.jutils.convert.to_scala_option(convergence_tolerance)))
def save(
self, path)
Persists the graph to the given file path
def save(self, path):
"""Persists the graph to the given file path"""
self._scala.save(path)
def triangle_count(
self)
Counts the number of triangles each vertex is a part of
Returns | (Frame): | Frame containing the vertex id's and the count of the number of triangle they are in |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.triangle_count()
>>> result.inspect()
[#] count id
==============
[0] 1 1
[1] 1 2
[2] 1 3
[3] 0 4
[4] 0 5
def triangle_count(self):
"""
Counts the number of triangles each vertex is a part of
Parameters
----------
:return: (Frame) Frame containing the vertex id's and the count of the number of triangle they are in
Examples
--------
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.triangle_count()
>>> result.inspect()
[#] count id
==============
[0] 1 1
[1] 1 2
[2] 1 3
[3] 0 4
[4] 0 5
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.triangleCount())
def vertex_count(
self)
Returns the number of rows in the vertices frame in this graph
>>> from graphframes import examples
>>> gf = examples.Graphs(tc.sql_context).friends()
>>> from sparktk.graph.graph import Graph
>>> g = Graph(tc, gf)
>>> g.vertex_count()
6
>>> v = tc.frame.create([("a", "Alice", 34),
... ("b", "Bob", 36),
... ("c", "Charlie", 30),
... ("d", "David", 29),
... ("e", "Esther", 32),
... ("f", "Fanny", 36),
... ], ["id", "name", "age"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> g2 = tc.graph.create(v, e)
>>> g2.vertex_count()
6
def vertex_count(self):
"""
Returns the number of rows in the vertices frame in this graph
Example
-------
>>> from graphframes import examples
>>> gf = examples.Graphs(tc.sql_context).friends()
>>> from sparktk.graph.graph import Graph
>>> g = Graph(tc, gf)
>>> g.vertex_count()
6
>>> v = tc.frame.create([("a", "Alice", 34),
... ("b", "Bob", 36),
... ("c", "Charlie", 30),
... ("d", "David", 29),
... ("e", "Esther", 32),
... ("f", "Fanny", 36),
... ], ["id", "name", "age"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> g2 = tc.graph.create(v, e)
>>> g2.vertex_count()
6
"""
return int(self._scala.vertexCount())
def weighted_degrees(
self, edge_weight, degree_option='undirected', default_weight=0.0)
Degree Calculation
A fundamental quantity in graph analysis is the degree of a vertex: The degree of a vertex is the number of edges adjacent to it.
For a directed edge relation, a vertex has both an out-degree (the number of edges leaving the vertex) and an in-degree (the number of edges entering the vertex).
In the presence of edge weights, vertices can have weighted degrees: The weighted degree of a vertex is the sum of weights of edges adjacent to it. Analogously, the weighted in-degree of a vertex is the sum of the weights of the edges entering it, and the weighted out-degree is the sum of the weights of the edges leaving the vertex. If a property is missing or empty on particular vertex, the default weight is used.
edge_weight | (String): | Name of the property that contains and edge weight |
degree_option | (String): | Either in, out or undirected. String describing the direction of edges |
default_weight | (Numeric): | Default weight value if a vertex has no value for the edge weight property |
Returns | (Frame): | Frame containing the vertex id's an their weights |
>>> vertex_schema = [('id', int), ('label', float)]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="out")
>>> result.inspect()
[#] id degree
===============
[0] 1 4
[1] 2 1
[2] 3 0
[3] 4 1
[4] 5 0
>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="in")
>>> result.inspect()
[#] id degree
===============
[0] 1 0
[1] 2 2
[2] 3 2
[3] 4 1
[4] 5 1
def weighted_degrees(self, edge_weight, degree_option='undirected', default_weight=0.0):
"""
**Degree Calculation**
A fundamental quantity in graph analysis is the degree of a vertex:
The degree of a vertex is the number of edges adjacent to it.
For a directed edge relation, a vertex has both an out-degree (the number of
edges leaving the vertex) and an in-degree (the number of edges entering the
vertex).
In the presence of edge weights, vertices can have weighted degrees: The
weighted degree of a vertex is the sum of weights of edges adjacent to it.
Analogously, the weighted in-degree of a vertex is the sum of the weights of
the edges entering it, and the weighted out-degree is the sum
of the weights of the edges leaving the vertex. If a property is missing or
empty on particular vertex, the default weight is used.
Parameters
----------
:param edge_weight: (String) Name of the property that contains and edge weight
:param degree_option: (String) Either in, out or undirected. String describing the direction of edges
:param default_weight: (Numeric) Default weight value if a vertex has no value for the edge weight property
:return: (Frame) Frame containing the vertex id's an their weights
Examples
--------
>>> vertex_schema = [('id', int), ('label', float)]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="out")
>>> result.inspect()
[#] id degree
===============
[0] 1 4
[1] 2 1
[2] 3 0
[3] 4 1
[4] 5 0
>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="in")
>>> result.inspect()
[#] id degree
===============
[0] 1 0
[1] 2 2
[2] 3 2
[3] 4 1
[4] 5 1
"""
from sparktk.frame.frame import Frame
return Frame(self._tc, self._scala.weightedDegree(edge_weight, degree_option, default_weight))