1 module libsweatyballs.link.core; 2 3 import libsweatyballs.link.message.core; 4 import libsweatyballs.link.unit : LinkUnit; 5 import core.sync.mutex : Mutex; 6 import libsweatyballs.engine.core : Engine; 7 import core.thread : Thread; 8 import bmessage; 9 import std.socket; 10 import gogga; 11 import std.conv : to; 12 import google.protobuf; 13 import libsweatyballs.router.table : Route; 14 import libsweatyballs.zwitch.neighbor : Neighbor; 15 import std.string : cmp; 16 17 /** 18 * Link 19 * 20 * Description: Represents a "pipe" whereby different protocol messages can be transported over. 21 * Such protocol messages include data packet transport (receiving and sending) along with 22 * router advertisements messages 23 * 24 * This class handles the Message queues for sending and receiving messages (and partially decoding them) 25 */ 26 public final class Link : Thread 27 { 28 /** 29 * In and out queues 30 */ 31 private LinkUnit[] inQueue; 32 private LinkUnit[] outQueue; 33 private Mutex inQueueLock; 34 private Mutex outQueueLock; 35 36 private string interfaceName; 37 38 39 private Engine engine; 40 41 42 private Watcher advWatch; 43 private Watcher neighWatch; 44 45 this(string interfaceName, Engine engine) 46 { 47 /* Set the thread's worker function */ 48 super(&worker); 49 50 this.interfaceName = interfaceName; 51 this.engine = engine; 52 53 /* Initialize locks */ 54 initMutexes(); 55 56 /* Setup watchers */ 57 setupWatchers(); 58 } 59 60 public string getInterface() 61 { 62 return interfaceName; 63 } 64 65 /** 66 * Initialize the queue mutexes 67 */ 68 private void initMutexes() 69 { 70 inQueueLock = new Mutex(); 71 outQueueLock = new Mutex(); 72 handlersLock = new Mutex(); 73 } 74 75 /** 76 * Sets up watchers, one for advertisements and one for 77 * nieghbor-to-neighbor communications 78 */ 79 private void setupWatchers() 80 { 81 /* Setup the advertisement socket (bound to ff02::1%interface) port 6666 */ 82 Socket mcastSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP); 83 mcastSock.bind(parseAddress("ff02::1%"~getInterface(), 6666)); 84 85 /* Setup the advertisement watcher */ 86 advWatch = new Watcher(this, mcastSock); 87 88 89 /* Setup the router-to-router socket (bound to ::) port 6667 */ 90 Socket neighSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP); 91 neighSock.bind(parseAddress("::", 0)); 92 93 /* Setup the neighbor watcher */ 94 neighWatch = new Watcher(this, neighSock); 95 } 96 97 /** 98 * Returns the router-to-router port being used for this link 99 */ 100 public ushort getR2RPort() 101 { 102 return neighWatch.getPort(); 103 } 104 105 /** 106 * Listens for advertisements 107 * 108 * TODO: We also must listen for traffic here though 109 */ 110 private void worker() 111 { 112 while(true) 113 { 114 115 /** 116 * Check if there are any LinkUnit's to be processed 117 * then process them 118 */ 119 // gprintln(hasInQueue()); 120 121 if(hasInQueue()) 122 { 123 /* Pop off a message */ 124 LinkUnit unit = popInQueue(); 125 126 /* Process message */ 127 process(unit); 128 129 gprintln("Pablo"); 130 } 131 132 133 } 134 } 135 136 137 /** 138 * Given Address we take the IP address (not source port of mcast packet) 139 * and then also the `nieghborPort` and spit out a new Address 140 */ 141 public static Address getNeighborIPAddress(Address sender, ushort neighborPort) 142 { 143 /* IPv6 reachable neighbor socket */ 144 Address neighborAddress = parseAddress(sender.toAddrString(), neighborPort); 145 146 return neighborAddress; 147 } 148 149 alias LinkUnitHandler = void function (LinkUnit); 150 151 private LinkUnitHandler[ubyte] handlers; 152 private Mutex handlersLock; 153 154 public void registerHandler(LinkUnitHandler funcPtr, ubyte code) 155 { 156 handlersLock.lock(); 157 handlers[code] = funcPtr; 158 handlersLock.unlock(); 159 } 160 161 private LinkUnitHandler defaultHandler; 162 163 public void setDefaultHandler(LinkUnitHandler funcPtr) 164 { 165 defaultHandler = funcPtr; 166 } 167 168 /** 169 * Returns the given handler function associated 170 * with the provided message type. 171 * 172 * If the message type is not found then a default 173 * handler is returned 174 * 175 * TODO: I should work on eventy again and use that 176 * for this project then (next version) 177 */ 178 public LinkUnitHandler getHandler(ubyte code) 179 { 180 LinkUnitHandler handler; 181 182 handlersLock.lock(); 183 184 handler = *(code in handlers); 185 186 if(!handler) 187 { 188 handler = defaultHandler; 189 } 190 191 handlersLock.unlock(); 192 193 194 return handler; 195 } 196 197 /** 198 * This will process the message 199 * 200 * Handles message type: SESSION, ADVERTISEMENT 201 */ 202 private void process(LinkUnit unit) 203 { 204 /** 205 * Message details 206 * 207 * 1. Public key of Link Neighbor 208 * 2. Signature of Link Neighbor 209 * 3. Neighbor port of LinkNeighbor 210 * 4. Message type (also from LinkUnit address) 211 * 5. Payload 212 */ 213 link.LinkMessage message = unit.getMessage(); 214 link.LinkMessageType mType = message.type; 215 Address sender = unit.getSender(); 216 string identity = message.publicKey; 217 ushort neighborPort = to!(ushort)(message.neighborPort); 218 ubyte[] msgPayload = message.payload; 219 220 221 gprintln("Processing message from "~to!(string)(sender)~ 222 " of type "~to!(string)(mType)); 223 gprintln("Public key: "~identity); 224 gprintln("Signature: Not yet implemented"); 225 gprintln("Neighbor Port: "~to!(string)(neighborPort)); 226 227 228 229 230 231 /** 232 * Enter the Neighbor details into the Switch 233 */ 234 Address neighborAddress = getNeighborIPAddress(sender, neighborPort); 235 Neighbor neighbor = new Neighbor(identity, neighborAddress, this); 236 engine.getSwitch().addNeighbor(neighbor); 237 238 239 /** 240 * Get the handler required for the given message type 241 * and call it 242 */ 243 LinkUnitHandler handlerFunc = getHandler(to!(ubyte)(message.type)); 244 handlerFunc(unit); 245 246 247 248 249 /* Handle route advertisements */ 250 if(mType == LinkMessageType.ADVERTISEMENT) 251 { 252 253 } 254 /* Handle session messages */ 255 else if(mType == LinkMessageType.PACKET) 256 { 257 258 } 259 /* TODO: Does protobuf throw en error if so? */ 260 else 261 { 262 assert(false); 263 } 264 } 265 266 public static LinkMessage decode(byte[] data) 267 { 268 try 269 { 270 ubyte[] dataIn = cast(ubyte[])data; 271 LinkMessage message = fromProtobuf!(LinkMessage)(dataIn); 272 return message; 273 } 274 catch(ProtobufException) 275 { 276 return null; 277 } 278 } 279 280 public void enqueueIn(LinkUnit unit) 281 { 282 /* Add to the in-queue */ 283 inQueueLock.lock(); 284 inQueue ~= unit; 285 inQueueLock.unlock(); 286 } 287 288 289 290 public bool hasInQueue() 291 { 292 bool status; 293 inQueueLock.lock(); 294 status = inQueue.length != 0; 295 inQueueLock.unlock(); 296 return status; 297 } 298 299 public LinkUnit popInQueue() 300 { 301 LinkUnit message; 302 303 /* TODO: Throw exception on `hasInQueue()` false */ 304 305 inQueueLock.lock(); 306 307 /* Pop the message */ 308 message = inQueue[0]; 309 310 if(inQueue.length == 1) 311 { 312 inQueue.length = 0; 313 } 314 else 315 { 316 inQueue = inQueue[1..inQueue.length]; 317 } 318 319 inQueueLock.unlock(); 320 321 return message; 322 } 323 324 // public bool hasOutQueue() 325 // { 326 // bool status; 327 // inQueueLock.lock(); 328 // status = inQueue.length != 0; 329 // inQueueLock.unlock(); 330 // return status; 331 // } 332 333 334 335 336 public void launch() 337 { 338 start(); 339 340 advWatch.start(); 341 neighWatch.start(); 342 } 343 344 345 346 /** 347 * Blocks to receive one message from the incoming queue 348 */ 349 public LinkMessage receive() 350 { 351 /* TODO: Implement me */ 352 return null; 353 } 354 355 /** 356 * Sends a message 357 */ 358 public void send(LinkMessage message, string recipient) 359 { 360 /* TODO: Implement me */ 361 } 362 } 363 364 /** 365 * Watcher 366 * 367 * Given a socket this will dequeue message, decode them and pass 368 * them up to the Link for processing 369 */ 370 public final class Watcher : Thread 371 { 372 private Socket socket; 373 private Link link; 374 375 this(Link link, Socket socket) 376 { 377 super(&worker); 378 this.link = link; 379 this.socket = socket; 380 } 381 382 public ushort getPort() 383 { 384 return to!(ushort)(socket.localAddress.toPortString()); 385 } 386 387 /** 388 * Listens for advertisements 389 * 390 * TODO: We also must listen for traffic here though 391 */ 392 private void worker() 393 { 394 while(true) 395 { 396 397 /** 398 * MSG_PEEK, we don't want to dequeue this message yet but need to call receive 399 * MSG_TRUNC, return the number of bytes of the datagram even when 400 * bigger than passed in array 401 */ 402 SocketFlags flags; 403 flags |= MSG_TRUNC; 404 flags |= MSG_PEEK; 405 406 /* Receive buffer */ 407 byte[] data; 408 Address address; 409 410 /* Empty array won't work */ 411 data.length = 1; 412 413 gprintln("Awaiting message..."); 414 long len = socket.receiveFrom(data, flags, address); 415 416 if(len <= 0) 417 { 418 /* TODO: Error handling */ 419 } 420 else 421 { 422 /* Receive at the length found */ 423 data.length = len; 424 socket.receiveFrom(data, address); 425 426 /* Decode the message */ 427 LinkMessage message = Link.decode(data); 428 429 /* If decoding worked */ 430 if(message) 431 { 432 /* Couple Address-and-message */ 433 LinkUnit unit = new LinkUnit(address, message, link); 434 435 /* Process message */ 436 link.enqueueIn(unit); 437 } 438 /* If ProtocolBuffer decoding failed */ 439 else 440 { 441 gprintln("Watcher: ProtocolBuffer decode failed", DebugType.ERROR); 442 } 443 } 444 445 } 446 } 447 }