sparktk graph
Functions
def create(
source_or_vertices_frame, edges_frame=None, tc=<class 'sparktk.arguments.implicit'>)
Create a sparktk Graph from two sparktk Frames (or some other source)
source_or_vertices_frame: | a graph source or a vertices frame Valid sources include: a python and spark GraphFrame, or a scala Graph Otherwise if a vertices frame is provided, then the edges_frame arg must also be supplied. A vertices frame defines the vertices for the graph and must have a schema with a column named "id" which provides unique vertex ID. All other columns are treated as vertex properties. If a column is also found named "vertex_type", it will be used as a special label to denote the type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a specific vertex type. |
edges_frame | (valid only if the source_or_vertices_frame arg is a vertices Frame): | An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic (such as a graph DB) which expects a specific edge type. |
def import_orientdb_graph(
db_url, user_name, password, root_password, tc=<class 'sparktk.arguments.implicit'>)
Import graph from OrientDB to spark-tk as spark-tk graph (Spark GraphFrame)
Parameters
----------
:param:(str) db_url: OrientDB URI
:param:(str) user_name: the database username
:param:(str) password: the database password
:param :(str)root_password: OrientDB server password
Example
-------
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
... ("b", "Bob", 36,"M"),
... ("c", "Charlie", 30,"M"),
... ("d", "David", 29,"M"),
... ("e", "Esther", 32,"F"),
... ("f", "Fanny", 36,"F"),
... ], ["id", "name", "age","gender"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> sparktk_graph = tc.graph.create(v,e)
>>> db = "test_db"
>>> sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")
>>> imported_gf = tc.graph.import_orientdb_graph(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password")
>>> imported_gf.graphframe.vertices.show()
+-------+------+---+---+ | name|gender| id|age| +-------+------+---+---+ | Bob| M| b| 36| | David| M| d| 29| |Charlie| M| c| 30| | Alice| F| a| 34| | Esther| F| e| 32| | Fanny| F| f| 36| +-------+------+---+---+
>>> imported_gf.graphframe.edges.show()
+---+------------+---+ |dst|relationship|src| +---+------------+---+ | f| follow| e| | b| follow| c| | c| follow| b| | c| follow| f| | b| friend| a| | a| friend| d| | d| friend| e| | e| friend| a| +---+------------+---+
def load(
path, tc=<class 'sparktk.arguments.implicit'>)
load Graph from given path
Classes
class Graph
sparktk Graph
Represents a graph with a frame defining vertices and another frame defining edges. It is implemented as a very thin wrapper of the spark GraphFrame (https://github.com/graphframes/graphframes) onto which additional methods are available.
A vertices frame defines the vertices for the graph and must have a schema with a column named "id" which provides unique vertex ID. All other columns are treated as vertex properties. If a column is also found named "vertex_type", it will be used as a special label to denote the type of vertex, for example, when interfacing with logic (such as a graph DB) which expects a specific vertex type.
An edge frame defines the edges of the graph; schema must have columns names "src" and "dst" which provide the vertex ids of the edge. All other columns are treated as edge properties. If a column is also found named "edge_type", it will be used as a special label to denote the type of edge, for example, when interfacing with logic (such as a graph DB) which expects a specific edge type.
Unlike sparktk Frames, this Graph is immutable (it cannot be changed). The vertices and edges may be extracted as sparktk Frame objects, but those frames then take on a life of their own apart from this graph object. Those frames could be transformed and used to build a new graph object if necessary.
The underlying spark GraphFrame is available as the 'graphframe' property of this object.
>>> viewers = tc.frame.create([['fred', 0],
... ['wilma', 0],
... ['pebbles', 1],
... ['betty', 0],
... ['barney', 0],
... ['bamm bamm', 1]],
... schema= [('id', str), ('kids', int)])
>>> titles = ['Croods', 'Jurassic Park', '2001', 'Ice Age', 'Land Before Time']
>>> movies = tc.frame.create([[t] for t in titles], schema=[('id', str)])
>>> vertices = viewers.copy()
>>> vertices.append(movies)
>>> vertices.inspect(20)
[##] id kids
============================
[0] fred 0
[1] wilma 0
[2] pebbles 1
[3] betty 0
[4] barney 0
[5] bamm bamm 1
[6] Croods None
[7] Jurassic Park None
[8] 2001 None
[9] Ice Age None
[10] Land Before Time None
>>> edges = tc.frame.create([['fred','Croods',5],
... ['fred','Jurassic Park',5],
... ['fred','2001',2],
... ['fred','Ice Age',4],
... ['wilma','Jurassic Park',3],
... ['wilma','2001',5],
... ['wilma','Ice Age',4],
... ['pebbles','Croods',4],
... ['pebbles','Land Before Time',3],
... ['pebbles','Ice Age',5],
... ['betty','Croods',5],
... ['betty','Jurassic Park',3],
... ['betty','Land Before Time',4],
... ['betty','Ice Age',3],
... ['barney','Croods',5],
... ['barney','Jurassic Park',5],
... ['barney','Land Before Time',3],
... ['barney','Ice Age',5],
... ['bamm bamm','Croods',5],
... ['bamm bamm','Land Before Time',3]],
... schema = ['src', 'dst', 'rating'])
>>> edges.inspect(20)
[##] src dst rating
=========================================
[0] fred Croods 5
[1] fred Jurassic Park 5
[2] fred 2001 2
[3] fred Ice Age 4
[4] wilma Jurassic Park 3
[5] wilma 2001 5
[6] wilma Ice Age 4
[7] pebbles Croods 4
[8] pebbles Land Before Time 3
[9] pebbles Ice Age 5
[10] betty Croods 5
[11] betty Jurassic Park 3
[12] betty Land Before Time 4
[13] betty Ice Age 3
[14] barney Croods 5
[15] barney Jurassic Park 5
[16] barney Land Before Time 3
[17] barney Ice Age 5
[18] bamm bamm Croods 5
[19] bamm bamm Land Before Time 3
>>> graph = tc.graph.create(vertices, edges)
>>> graph
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
>>> graph.save("sandbox/old_movie_graph")
>>> restored = tc.load("sandbox/old_movie_graph")
>>> restored
Graph(v:[id: string, kids: int], e:[src: string, dst: string, rating: int])
Ancestors (in MRO)
- Graph
- __builtin__.object
Instance variables
var graphframe
The underlying graphframe object which exposes several methods and properties
Methods
def __init__(
self, tc, source_or_vertices_frame, edges_frame=None)
def clustering_coefficient(
self)
The clustering coefficient of a vertex provides a measure of how tightly clustered that vertex's neighborhood is.
Formally:
.. math::
cc(v) = rac{ \| { (u,v,w) \in V^3: \ {u,v}, {u, w}, {v,w } \in E } \| }{\| { (u,v,w) \in V^3: \ {v, u }, {v, w} \in E } \|}
For further reading on clustering coefficients, see http://en.wikipedia.org/wiki/Clustering_coefficient.
This method returns a frame with the vertex id associated with it's local clustering coefficient
Returns | (Frame): | Frame containing the vertex id's and their clustering coefficient |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.clustering_coefficient()
>>> result.inspect()
[#] id clustering_coefficient
===============================
[0] 1 0.333333333333
[1] 2 1.0
[2] 3 1.0
[3] 4 0.0
[4] 5 0.0
def connected_components(
self)
Connected components determines groups all the vertices in a particular graph by whether or not there is path between these vertices. This method returns a frame with the vertices and their corresponding component
Returns | (Frame): | Frame containing the vertex id's and their components |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.connected_components()
>>> result.inspect()
[#] id component
==================
[0] 1 1
[1] 2 1
[2] 3 1
[3] 4 4
[4] 5 4
def create_edges_frame(
self)
Creates a frame representing the edges stored in this graph
def create_vertices_frame(
self)
Creates a frame representing the vertices stored in this graph
def degrees(
self, degree_option='undirected')
Degree Calculation
A fundamental quantity in graph analysis is the degree of a vertex: The degree of a vertex is the number of edges adjacent to it.
For a directed edge relation, a vertex has both an out-degree (the number of edges leaving the vertex) and an in-degree (the number of edges entering the vertex).
degree_option | (String): | Either in, out or undirected. String describing the direction of edges |
Returns | (Frame): | Frame containing the vertex id's an their weights |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.degrees(degree_option="out")
>>> result.inspect()
[#] id degree
===============
[0] 1 3
[1] 2 1
[2] 3 0
[3] 4 1
[4] 5 0
>>> result = graph.degrees(degree_option="in")
>>> result.inspect()
[#] id degree
===============
[0] 1 0
[1] 2 1
[2] 3 2
[3] 4 1
[4] 5 1
def export_to_orientdb(
self, db_url, user_name, password, root_password, vertex_type_column_name=None, edge_type_column_name=None, batch_size=1000)
Export Spark-tk Graph (GraphFrame) to OrientDB API creates OrientDB database with the given database name, URL and credentials. It exports the vertex and edge dataframes schema OrientDB based on the given vertex_type_column_name and edge_type_column_name. If any of them was None it exports it to the base type class.
:param:(str) db_url: OrientDB URI :param:(str) user_name: the database username :param:(str) password: the database password :param:(str) root_password: OrientDB server password :param:(Optional(str)) vertex_type_column_name: column name from the vertex data frame specified to be the vertex type :param:(Optional(str)) edge_type_column_name: column name from the edge data frame specified to be the edge type :param:(int) batch_size: batch size for graph ETL to OrientDB database
>>> v = tc.frame.create([("a", "Alice", 34,"F"),
... ("b", "Bob", 36,"M"),
... ("c", "Charlie", 30,"M"),
... ("d", "David", 29,"M"),
... ("e", "Esther", 32,"F"),
... ("f", "Fanny", 36,"F"),
... ], ["id", "name", "age","gender"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> sparktk_graph = tc.graph.create(v,e)
>>> db = "test_db"
>>> result = sparktk_graph.export_to_orientdb(db_url="remote:hostname:2424/%s" % db,user_name= "admin",password = "admin",root_password = "orientdb_server_root_password",vertex_type_column_name= "gender",edge_type_column_name="relationship")
>>> result
db_uri = remote:hostname:2424/test_db
edge_types = {u'follow': 4L, u'friend': 4L}
exported_edges_summary = {u'Total Exported Edges Count': 8L, u'Failure Count': 0L}
exported_vertices_summary = {u'Total Exported Vertices Count': 6L, u'Failure Count': 0L}
vertex_types = {u'M': 3L, u'F': 3L}
def global_clustering_coefficient(
self)
The clustering coefficient of a graph provides a measure of how tightly clustered an undirected graph is.
More formally:
.. math::
cc(G) = rac{ \| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\}, \{v,w \} \in E \} \| }{\| \{ (u,v,w) \in V^3: \ \{u,v\}, \{u, w\} \in E \} \|}
Returns | (Double): | The global clustering coefficient of the graph |
The clustering coefficient on a graph with some triangles will be
greater than 0
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_schema = [('src', int), ('dst', int)]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> graph.global_clustering_coefficient()
0.5
The clustering coefficient on a graph with no triangles (a tree) is
0
>>> vertex_rows_star = [ [1], [2], [3], [4]]
>>> vertex_frame_star = tc.frame.create(vertex_rows_star, vertex_schema)
>>> edge_rows_star = [ [1, 2], [1, 3], [1, 4]]
>>> edge_frame_star = tc.frame.create(edge_rows_star, edge_schema)
>>> graph_star = tc.graph.create(vertex_frame_star, edge_frame_star)
>>> graph_star.global_clustering_coefficient()
0.0
def label_propagation(
self, max_iterations)
Assigns label based off of proximity to different vertices. The labels are initially 1 unique label per vertex (the vertex id), and as the algorithm runs some of these get erased
Note this algorithm is neither guaranteed to converge, nor guaranteed to converge to the correct value.
This calls graph frames label propagation which can be found at
http://graphframes.github.io/api/scala/index.html#org.graphframes.lib.LabelPropagation
Returns | (Frame): | Frame containing the vertex id's and the community they are a member of |
>>> vertex_schema = [('id', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.label_propagation(10)
>>> result.inspect()
[#] id label
==============
[0] 1 1
[1] 2 2
[2] 3 2
[3] 4 2
[4] 5 1
def loopy_belief_propagation(
self, prior, edge_weight, max_iterations=10)
Performs loopy belief propagation on a graph representing a Potts model. This optimizes based off of user provided priors.
prior | (String): | The name of the column of space delimited string of floats representing the prior distribution on a vertex |
edge_weight | (String): | The name of the column of weight value on edges |
max_iterations: | The number of iterations to run for |
>>> vertex_schema = [('id', int), ('label', float), ("prior_val", str), ("was_labeled", int)]
>>> vertex_rows = [ [1, 1, "0.7 0.3", 1], [2, 1, "0.7 0.3", 1], [3, 5, "0.7 0.3", 0], [4, 5, "0.7 0.3", 0], [5, 5, "0.7 0.3", 1] ]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> vertex_frame.inspect()
[#] id label prior_val was_labeled
======================================
[0] 1 1.0 0.7 0.3 1
[1] 2 1.0 0.7 0.3 1
[2] 3 5.0 0.7 0.3 0
[3] 4 5.0 0.7 0.3 0
[4] 5 5.0 0.7 0.3 1
>>> result = graph.loopy_belief_propagation("prior_val", "weight", 2)
>>> result.inspect()
[#] id label prior_val was_labeled
======================================
[0] 1 1.0 0.7 0.3 1
[1] 2 1.0 0.7 0.3 1
[2] 3 5.0 0.7 0.3 0
[3] 4 5.0 0.7 0.3 0
[4] 5 5.0 0.7 0.3 1
<BLANKLINE>
[#] posterior
==============================================
[0] [0.9883347610773112,0.011665238922688819]
[1] [0.9743014865548763,0.025698513445123698]
[2] [0.9396772870897875,0.06032271291021254]
[3] [0.9319529856190276,0.06804701438097235]
[4] [0.8506957305238876,0.1493042694761125]
def page_rank(
self, convergence_tolerance=None, reset_probability=None, max_iterations=None)
Page Rank
Page Rank is a popular statistic that ranks vertices based off of connectivity in the global graph
Exactly 1 of convergence_tolerance and max_iterations must be set (termination criteria)
:convergence_tolerance: (Float) If the difference between successive iterations is less than this, the algorithm terminates. Mutually exclusive with max_iterations :reset_probability: (Float) Value for the reset probabiity in the page rank algorithm :max_iterations: (Int) Maximum number of iterations the page rank should run before terminating. Mutually exclusive with convergence_tolerance
Returns | (Frame): | Frame containing the vertex id's and their page rank |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.page_rank(max_iterations=20)
>>> result.inspect()
[#] id pagerank
=================
[0] 1 0.15
[1] 2 0.1925
[2] 3 0.356125
[3] 4 0.1925
[4] 5 0.313625
def save(
self, path)
Persists the graph to the given file path
def triangle_count(
self)
Counts the number of triangles each vertex is a part of
Returns | (Frame): | Frame containing the vertex id's and the count of the number of triangle they are in |
>>> vertex_schema = [('id', int)]
>>> edge_schema = [('src', int), ('dst', int)]
>>> vertex_rows = [ [1], [2], [3], [4], [5] ]
>>> edge_rows = [ [1, 2], [1, 3], [2, 3], [1, 4], [4, 5] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.triangle_count()
>>> result.inspect()
[#] count id
==============
[0] 1 1
[1] 1 2
[2] 1 3
[3] 0 4
[4] 0 5
def vertex_count(
self)
Returns the number of rows in the vertices frame in this graph
>>> from graphframes import examples
>>> gf = examples.Graphs(tc.sql_context).friends()
>>> from sparktk.graph.graph import Graph
>>> g = Graph(tc, gf)
>>> g.vertex_count()
6
>>> v = tc.frame.create([("a", "Alice", 34),
... ("b", "Bob", 36),
... ("c", "Charlie", 30),
... ("d", "David", 29),
... ("e", "Esther", 32),
... ("f", "Fanny", 36),
... ], ["id", "name", "age"])
>>> e = tc.frame.create([("a", "b", "friend"),
... ("b", "c", "follow"),
... ("c", "b", "follow"),
... ("f", "c", "follow"),
... ("e", "f", "follow"),
... ("e", "d", "friend"),
... ("d", "a", "friend"),
... ("a", "e", "friend")
... ], ["src", "dst", "relationship"])
>>> g2 = tc.graph.create(v, e)
>>> g2.vertex_count()
6
def weighted_degrees(
self, edge_weight, degree_option='undirected', default_weight=0.0)
Degree Calculation
A fundamental quantity in graph analysis is the degree of a vertex: The degree of a vertex is the number of edges adjacent to it.
For a directed edge relation, a vertex has both an out-degree (the number of edges leaving the vertex) and an in-degree (the number of edges entering the vertex).
In the presence of edge weights, vertices can have weighted degrees: The weighted degree of a vertex is the sum of weights of edges adjacent to it. Analogously, the weighted in-degree of a vertex is the sum of the weights of the edges entering it, and the weighted out-degree is the sum of the weights of the edges leaving the vertex. If a property is missing or empty on particular vertex, the default weight is used.
edge_weight | (String): | Name of the property that contains and edge weight |
degree_option | (String): | Either in, out or undirected. String describing the direction of edges |
default_weight | (Numeric): | Default weight value if a vertex has no value for the edge weight property |
Returns | (Frame): | Frame containing the vertex id's an their weights |
>>> vertex_schema = [('id', int), ('label', float)]
>>> edge_schema = [('src', int), ('dst', int), ('weight', int)]
>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ]
>>> edge_rows = [ [1, 2, 2], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = tc.frame.create(vertex_rows, vertex_schema)
>>> edge_frame = tc.frame.create(edge_rows, edge_schema)
>>> graph = tc.graph.create(vertex_frame, edge_frame)
>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="out")
>>> result.inspect()
[#] id degree
===============
[0] 1 4
[1] 2 1
[2] 3 0
[3] 4 1
[4] 5 0
>>> result = graph.weighted_degrees(edge_weight="weight", degree_option="in")
>>> result.inspect()
[#] id degree
===============
[0] 1 0
[1] 2 2
[2] 3 2
[3] 4 1
[4] 5 1