1 module libsweatyballs.link.core; 2 3 import libsweatyballs.link.message.core; 4 import libsweatyballs.link.unit : LinkUnit; 5 import core.sync.mutex : Mutex; 6 import libsweatyballs.engine.core : Engine; 7 import core.thread : Thread; 8 import bmessage; 9 import std.socket; 10 import gogga; 11 import std.conv : to; 12 import google.protobuf; 13 import libsweatyballs.router.table : Route; 14 import libsweatyballs.zwitch.neighbor : Neighbor; 15 import std.string : cmp; 16 17 /** 18 * Link 19 * 20 * Description: Represents a "pipe" whereby different protocol messages can be transported over. 21 * Such protocol messages include data packet transport (receiving and sending) along with 22 * router advertisements messages 23 * 24 * This class handles the Message queues for sending and receiving messages (and partially decoding them) 25 */ 26 public final class Link : Thread 27 { 28 /** 29 * In and out queues 30 */ 31 private LinkUnit[] inQueue; 32 private LinkUnit[] outQueue; 33 private Mutex inQueueLock; 34 private Mutex outQueueLock; 35 36 private string interfaceName; 37 38 39 private Engine engine; 40 41 42 private Watcher advWatch; 43 private Watcher neighWatch; 44 45 this(string interfaceName, Engine engine) 46 { 47 /* Set the thread's worker function */ 48 super(&worker); 49 50 this.interfaceName = interfaceName; 51 this.engine = engine; 52 53 /* Initialize locks */ 54 initMutexes(); 55 56 /* Setup watchers */ 57 setupWatchers(); 58 } 59 60 public string getInterface() 61 { 62 return interfaceName; 63 } 64 65 /** 66 * Initialize the queue mutexes 67 */ 68 private void initMutexes() 69 { 70 inQueueLock = new Mutex(); 71 outQueueLock = new Mutex(); 72 } 73 74 /** 75 * Sets up watchers, one for advertisements and one for 76 * nieghbor-to-neighbor communications 77 */ 78 private void setupWatchers() 79 { 80 /* Setup the advertisement socket (bound to ff02::1%interface) port 6666 */ 81 Socket mcastSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP); 82 mcastSock.bind(parseAddress("ff02::1%"~getInterface(), 6666)); 83 84 /* Setup the advertisement watcher */ 85 advWatch = new Watcher(this, mcastSock); 86 87 88 /* Setup the router-to-router socket (bound to ::) port 6667 */ 89 Socket neighSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP); 90 neighSock.bind(parseAddress("::", 0)); 91 92 /* Setup the neighbor watcher */ 93 neighWatch = new Watcher(this, neighSock); 94 } 95 96 /** 97 * Returns the router-to-router port being used for this link 98 */ 99 public ushort getR2RPort() 100 { 101 return neighWatch.getPort(); 102 } 103 104 /** 105 * Listens for advertisements 106 * 107 * TODO: We also must listen for traffic here though 108 */ 109 private void worker() 110 { 111 while(true) 112 { 113 114 /** 115 * Check if there are any LinkUnit's to be processed 116 * then process them 117 */ 118 // gprintln(hasInQueue()); 119 120 if(hasInQueue()) 121 { 122 /* Pop off a message */ 123 LinkUnit unit = popInQueue(); 124 125 /* Process message */ 126 process(unit); 127 128 gprintln("Pablo"); 129 } 130 131 132 } 133 } 134 135 136 /** 137 * Given Address we take the IP address (not source port of mcast packet) 138 * and then also the `nieghborPort` and spit out a new Address 139 */ 140 private static Address getNeighborIPAddress(Address sender, ushort neighborPort) 141 { 142 /* IPv6 reachable neighbor socket */ 143 Address neighborAddress = parseAddress(sender.toAddrString(), neighborPort); 144 145 return neighborAddress; 146 } 147 148 /** 149 * This will process the message 150 * 151 * Handles message type: SESSION, ADVERTISEMENT 152 */ 153 private void process(LinkUnit unit) 154 { 155 /** 156 * Message details 157 * 158 * 1. Public key of Link Neighbor 159 * 2. Signature of Link Neighbor 160 * 3. Neighbor port of LinkNeighbor 161 * 4. Message type (also from LinkUnit address) 162 * 5. Payload 163 */ 164 link.LinkMessage message = unit.getMessage(); 165 link.LinkMessageType mType = message.type; 166 Address sender = unit.getSender(); 167 string identity = message.publicKey; 168 ushort neighborPort = to!(ushort)(message.neighborPort); 169 ubyte[] msgPayload = message.payload; 170 171 172 gprintln("Processing message from "~to!(string)(sender)~ 173 " of type "~to!(string)(mType)); 174 gprintln("Public key: "~identity); 175 gprintln("Signature: Not yet implemented"); 176 gprintln("Neighbor Port: "~to!(string)(neighborPort)); 177 178 179 180 /** 181 * Enter the Neighbor details into the Switch 182 */ 183 Address neighborAddress = getNeighborIPAddress(sender, neighborPort); 184 Neighbor neighbor = new Neighbor(identity, neighborAddress, this); 185 engine.getSwitch().addNeighbor(neighbor); 186 187 188 189 190 /* Handle route advertisements */ 191 if(mType == LinkMessageType.ADVERTISEMENT) 192 { 193 Advertisement advMsg = fromProtobuf!(link.Advertisement)(msgPayload); 194 195 /* Get the routes being advertised */ 196 RouteEntry[] routes = advMsg.routes; 197 gprintln("Total of "~to!(string)(routes.length)~" received"); 198 199 /* TODO: Do router2router verification here */ 200 201 /* Add each route to the routing table */ 202 foreach(RouteEntry route; routes) 203 { 204 uint metric = route.metric; 205 206 /** 207 * Create a new route with `nexthop` as the nexthop address 208 * Also set its metric to whatever it is +64 209 */ 210 Route newRoute = new Route(route.address, neighbor, 100, metric+64); 211 212 gprintln(route.address); 213 gprintln(engine.getRouter().getIdentity().getKeys().publicKey); 214 215 /** 216 * Don't add routes to oneself 217 */ 218 if(cmp(route.address, engine.getRouter().getIdentity().getKeys().publicKey) != 0) 219 { 220 /** 221 * Don;t add routes we advertised (from ourself) - this 222 * ecludes self route checked before entering here 223 */ 224 if(newRoute.getNexthop().getIdentity() != engine.getRouter().getIdentity().getKeys().publicKey) 225 { 226 /** 227 * TODO: Found it, only install routes if their updated metric on arrival is lesser than current route 228 * TODO: Search for existing route 229 * TODO: This might make above constraints nmeaningless, or well atleast the one above me (outer one not me thinks) 230 */ 231 Route possibleExistingRoute = engine.getRouter().getTable().lookup(route.address); 232 233 /* If no route exists then add it */ 234 if(!possibleExistingRoute) 235 { 236 engine.getRouter().getTable().addRoute(newRoute); 237 } 238 /* If a route exists only install it if current one has higher metric than advertised one */ 239 else 240 { 241 if(possibleExistingRoute.getMetric() > newRoute.getMetric()) 242 { 243 /* Remove the old one (higher metric than advertised route) */ 244 engine.getRouter().getTable().removeRoute(possibleExistingRoute); 245 246 /* Install the advertised route (lower metric than currently installed route) */ 247 engine.getRouter().getTable().addRoute(newRoute); 248 } 249 } 250 251 } 252 else 253 { 254 gprintln("Not adding a route that originated from us", DebugType.ERROR); 255 } 256 257 } 258 else 259 { 260 gprintln("Skipping addition of self-route", DebugType.WARNING); 261 } 262 } 263 } 264 /* Handle session messages */ 265 else if(mType == LinkMessageType.PACKET) 266 { 267 gprintln("Woohoo! PACKET received!", DebugType.WARNING); 268 269 try 270 { 271 272 /* TODO: Now check if destined to me, if so THEN attempt decrypt */ 273 link.Packet packet = fromProtobuf!(link.Packet)(msgPayload); 274 gprintln("Payload (encrypted): "~to!(string)(packet.payload)); 275 276 /* Attempt decrypting with my key */ 277 import crypto.rsa; 278 import std.string : cmp; 279 280 /* TODO: Make sure decryotion passes, maybe add a PayloadBytes thing to use that */ 281 ubyte[] decryptedPayload = RSA.decrypt(engine.getRouter().getIdentity().getKeys().privateKey, packet.payload); 282 gprintln("Payload (decrypted): "~cast(string)(decryptedPayload)); 283 284 /* Now see if it is destined to us */ 285 286 /* If it is destined to us (TODO: Accept it) */ 287 if(cmp(packet.toKey, engine.getRouter().getIdentity().getKeys().publicKey) == 0) 288 { 289 gprintln("PACKET IS ACCEPTED TO ME", DebugType.WARNING); 290 291 bool stat = engine.getSwitch().isNeighbour(packet.fromKey) !is null; 292 gprintln("WasPacketFromNeighbor: "~to!(string)(stat), DebugType.WARNING); 293 } 294 /* If it is not destined to me then forward it */ 295 else 296 { 297 engine.getSwitch().forward(packet); 298 gprintln("PACKET IS FORWRDED", DebugType.WARNING); 299 } 300 301 } 302 catch(ProtobufException) 303 { 304 gprintln("Failed to deserialize protobuff bytes", DebugType.ERROR); 305 } 306 307 308 309 310 311 } 312 /* TODO: Does protobuf throw en error if so? */ 313 else 314 { 315 assert(false); 316 } 317 } 318 319 public static LinkMessage decode(byte[] data) 320 { 321 try 322 { 323 ubyte[] dataIn = cast(ubyte[])data; 324 LinkMessage message = fromProtobuf!(LinkMessage)(dataIn); 325 return message; 326 } 327 catch(ProtobufException) 328 { 329 return null; 330 } 331 } 332 333 public void enqueueIn(LinkUnit unit) 334 { 335 /* Add to the in-queue */ 336 inQueueLock.lock(); 337 inQueue ~= unit; 338 inQueueLock.unlock(); 339 } 340 341 public bool hasInQueue() 342 { 343 bool status; 344 inQueueLock.lock(); 345 status = inQueue.length != 0; 346 inQueueLock.unlock(); 347 return status; 348 } 349 350 public LinkUnit popInQueue() 351 { 352 LinkUnit message; 353 354 /* TODO: Throw exception on `hasInQueue()` false */ 355 356 inQueueLock.lock(); 357 358 /* Pop the message */ 359 message = inQueue[0]; 360 361 if(inQueue.length == 1) 362 { 363 inQueue.length = 0; 364 } 365 else 366 { 367 inQueue = inQueue[1..inQueue.length]; 368 } 369 370 inQueueLock.unlock(); 371 372 return message; 373 } 374 375 // public bool hasOutQueue() 376 // { 377 // bool status; 378 // inQueueLock.lock(); 379 // status = inQueue.length != 0; 380 // inQueueLock.unlock(); 381 // return status; 382 // } 383 384 385 386 387 public void launch() 388 { 389 start(); 390 391 advWatch.start(); 392 neighWatch.start(); 393 } 394 395 396 397 /** 398 * Blocks to receive one message from the incoming queue 399 */ 400 public LinkMessage receive() 401 { 402 /* TODO: Implement me */ 403 return null; 404 } 405 406 /** 407 * Sends a message 408 */ 409 public void send(LinkMessage message, string recipient) 410 { 411 /* TODO: Implement me */ 412 } 413 } 414 415 /** 416 * Watcher 417 * 418 * Given a socket this will dequeue message, decode them and pass 419 * them up to the Link for processing 420 */ 421 public final class Watcher : Thread 422 { 423 private Socket socket; 424 private Link link; 425 426 this(Link link, Socket socket) 427 { 428 super(&worker); 429 this.link = link; 430 this.socket = socket; 431 } 432 433 public ushort getPort() 434 { 435 return to!(ushort)(socket.localAddress.toPortString()); 436 } 437 438 /** 439 * Listens for advertisements 440 * 441 * TODO: We also must listen for traffic here though 442 */ 443 private void worker() 444 { 445 while(true) 446 { 447 448 /** 449 * MSG_PEEK, we don't want to dequeue this message yet but need to call receive 450 * MSG_TRUNC, return the number of bytes of the datagram even when 451 * bigger than passed in array 452 */ 453 SocketFlags flags; 454 flags |= MSG_TRUNC; 455 flags |= MSG_PEEK; 456 457 /* Receive buffer */ 458 byte[] data; 459 Address address; 460 461 /* Empty array won't work */ 462 data.length = 1; 463 464 gprintln("Awaiting message..."); 465 long len = socket.receiveFrom(data, flags, address); 466 467 if(len <= 0) 468 { 469 /* TODO: Error handling */ 470 } 471 else 472 { 473 /* Receive at the length found */ 474 data.length = len; 475 socket.receiveFrom(data, address); 476 477 /* Decode the message */ 478 LinkMessage message = Link.decode(data); 479 480 /* If decoding worked */ 481 if(message) 482 { 483 /* Couple Address-and-message */ 484 LinkUnit unit = new LinkUnit(address, message); 485 486 /* Process message */ 487 link.enqueueIn(unit); 488 } 489 /* If ProtocolBuffer decoding failed */ 490 else 491 { 492 gprintln("Watcher: ProtocolBuffer decode failed", DebugType.ERROR); 493 } 494 } 495 496 } 497 } 498 }