{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true }, "outputs": [], "source": [ "# Please enter your names\n", "name = \"Fabian Langer, Yannik Bretschneider\"\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Programming Exercise: Minimum Spanning Tree Clustering\n", "Implement the MST clustering below. A basic datastructure is already defined as well as a function for plotting the result. \n" ] }, { "cell_type": "code", "execution_count": 177, "metadata": { "collapsed": true }, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "\n", "# Tell matplotlib to plot inside the Notebook\n", "%matplotlib inline\n", "\n", "class Point:\n", " x = 0\n", " y = 0\n", " cluster = 0\n", " def __init__(self, x = 0, y = 0, c = 0):\n", " self.x = x\n", " self.y = y\n", " self.cluster = c\n", " def distanceTo(self,point):\n", " distance = ((self.x-point.x)**2 + (self.y-point.y)**2)**(0.5)\n", " return distance\n", " \n", " def __repr__(self):\n", " return f\"Point(x={self.x}, y={self.y}, cluster={self.cluster})\"\n", " \n", " def __eq__(self, value):\n", " return self.x == value.x and self.y == value.y and self.cluster == value.cluster\n", "\n", "def plot(points):\n", " x = []\n", " y = []\n", " cl = []\n", " for p in points:\n", " x.append(p.x)\n", " y.append(p.y)\n", " cl.append(p.cluster)\n", "\n", " plt.suptitle('MST Clustering')\n", " # Our colors for plotting the data points\n", " scl = list(set(cl)) # reduce to unique colors by converting to a set\n", " colors = np.random.rand(len(scl))\n", " colormap = []\n", " for icl in cl:\n", " for i in range(0, len(scl)):\n", " if icl == scl[i]:\n", " colormap.append(colors[i])\n", " break\n", " # Plot the data\n", " plt.scatter(x, y, c=colormap, s=40)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Implement the Minimum Spanning Tree Clustering below. You will need some sort of data structure to represent a graph and it nodes. " ] }, { "cell_type": "code", "execution_count": 178, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from random import Random\n", "\n", "\n", "class Tree:\n", "\n", " root: int # index of the root node\n", " edges: list[tuple[int, int]] # list of edges (tuples of indices)\n", " points: list[Point] # list of points\n", " visited: list[bool] # list of visited nodes\n", "\n", " def __init__(self, points: list[Point], root: int | None = None):\n", " self.edges = []\n", " self.points = points\n", " self.visited = [False] * len(points)\n", " if root is None:\n", " print(len(points) - 1)\n", " self.root = Random().randint(0, len(points) - 1)\n", " else:\n", " self.root = root\n", " \n", " def add_edge(self, edge: tuple[int, int]):\n", " assert edge[0] < len(self.points) and edge[1] < len(self.points) and edge[0] >= 0 and edge[1] >= 0\n", " self.edges.append(edge)\n", " \n", " def not_visited(self, index: int) -> bool:\n", " return not self.visited[index]\n", " \n", " def iter_not_visited(self):\n", " for i in range(len(self.visited)):\n", " if self.not_visited(i):\n", " yield i\n", " \n", " def iter_visited(self):\n", " for i in range(len(self.visited)):\n", " if not self.not_visited(i):\n", " yield i\n", " \n", " def find_nearest_unvisited_to(self, index: int) -> tuple[int, float]: # o(n)\n", " nearest = None\n", " min_distance = float(\"inf\")\n", " for p in self.iter_not_visited():\n", " distance = self.points[index].distanceTo(self.points[p])\n", " if distance < min_distance:\n", " nearest = p\n", " min_distance = distance\n", " return nearest, min_distance\n", " \n", " def find_nearest_unvisited(self) -> tuple[int, float]: # o(n^2)\n", " nearest = None\n", " min_distance = float(\"inf\")\n", " for p in self.iter_visited():\n", " n, distance = self.find_nearest_unvisited_to(p)\n", " if distance < min_distance:\n", " nearest = n\n", " min_distance = distance\n", " return nearest, min_distance\n", " \n", " def find_longest_edge(self) -> tuple[int, int]:\n", " longest = None\n", " max_distance = 0\n", " for edge in self.edges:\n", " distance = self.points[edge[0]].distanceTo(self.points[edge[1]])\n", " if distance > max_distance:\n", " longest = edge\n", " max_distance = distance\n", " return longest\n", " \n", " def find_all_connected(self, index: int) -> list[int]:\n", " connected = [index]\n", " connected_len = 1\n", " prev_len = 0\n", " while connected_len != prev_len:\n", " prev_len = connected_len\n", " for edge in self.edges:\n", " if edge[0] in connected and edge[1] not in connected:\n", " connected.append(edge[1])\n", " elif edge[1] in connected and edge[0] not in connected:\n", " connected.append(edge[0])\n", " connected_len = len(connected)\n", " return connected\n", " \n", " \n", "def mst(points: list[Point]) -> Tree:\n", " tree = Tree(points)\n", " tree.visited[tree.root] = True\n", " for i in range(len(points) - 1):\n", " nearest, distance = tree.find_nearest_unvisited()\n", " if nearest is not None:\n", " tree.add_edge((tree.root, nearest))\n", " tree.visited[nearest] = True\n", " tree.root = nearest\n", " else: \n", " raise ValueError(\"No unvisited points left\")\n", " return tree\n", "\n", "def cluster(points: list[Point], k: int) -> list[Point]:\n", "\n", " print(\"Number of points: \", len(points))\n", " print(\"Building MST...\")\n", " tree = mst(points)\n", " print(\"MST edge count: \", len(tree.edges))\n", "\n", " for _ in range(k-1):\n", " longest_edge = tree.find_longest_edge()\n", " tree.edges.remove(longest_edge)\n", " \n", "\n", " # cluster 0 doesn't exist, starting from 1\n", "\n", " cluster_id = 1\n", "\n", " print(\"Clustering...\")\n", " while True:\n", " # go through all points connected to the first unclustered point\n", " unclustered = None\n", " for i, pt in enumerate(tree.points):\n", " if pt.cluster == 0:\n", " unclustered = i\n", " break\n", " \n", " if unclustered is None:\n", " break\n", " \n", " print(\"Cluster ID: \", cluster_id)\n", " \n", " print(\"Unclustered point: \", tree.points[unclustered].x, tree.points[unclustered].y)\n", " print(\"Finding connected points...\")\n", " connected = tree.find_all_connected(unclustered)\n", " print(\"Connected points: \", len(connected))\n", " for pt in connected:\n", " tree.points[pt].cluster = cluster_id\n", " cluster_id += 1\n", " \n", "\n", " return tree.points" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Note*: Please download spiral.txt and upload it into your *work* folder here on Jupyterhub.\n", "\n", "Now we setup the parameters:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "K = 6" ] }, { "cell_type": "code", "execution_count": 179, "metadata": {}, "outputs": [], "source": [ "K=3" ] }, { "cell_type": "code", "execution_count": 180, "metadata": { "collapsed": true }, "outputs": [], "source": [ "filepath = \"./clustering_datasets/\"\n", "filenames = [\"spiral\"]\n", "fileextension = \".txt\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": 181, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#TODO Think about a good criterium/parameter to determine the clusters" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we can cluster the data using MST-Clustering:" ] }, { "cell_type": "code", "execution_count": 182, "metadata": { "collapsed": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of points: 312\n", "Building MST...\n", "311\n", "MST edge count: 311\n", "Clustering...\n", "Cluster ID: 1\n", "Unclustered point: 31.95 7.95\n", "Finding connected points...\n", "Connected points: 106\n", "Cluster ID: 2\n", "Unclustered point: 19.35 31.65\n", "Finding connected points...\n", "Connected points: 101\n", "Cluster ID: 3\n", "Unclustered point: 3.9 9.6\n", "Finding connected points...\n", "Connected points: 105\n", "Finished clustering dataset: spiral\n" ] } ], "source": [ "results = {}\n", "\n", "for p in range(0,len(filenames)):\n", " datapoints = []\n", " f = filepath+filenames[p]+fileextension\n", " file = open(f,\"r\")\n", " line = file.readline()\n", " while(line != \"\"):\n", " a = line[:-1].split()\n", " x = float(a[0])\n", " y = float(a[1])\n", " datapoints.append(Point(x,y))\n", " line = file.readline()\n", " file.close()\n", " points = cluster(datapoints, k=K)\n", " results[filenames[p]] = points\n", " print(\"Finished clustering dataset: \" + filenames[p])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And visualize the result: (You might want to execute the plot function a couple of times if the random colors are too similar.)" ] }, { "cell_type": "code", "execution_count": 183, "metadata": { "collapsed": true }, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "plot(results[\"spiral\"])" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 1 }