1 module libsweatyballs.link.core;
2 
3 import libsweatyballs.link.message.core;
4 import libsweatyballs.link.unit : LinkUnit;
5 import core.sync.mutex : Mutex;
6 import libsweatyballs.engine.core : Engine;
7 import core.thread : Thread;
8 import bmessage;
9 import std.socket;
10 import gogga;
11 import std.conv : to;
12 import google.protobuf;
13 import libsweatyballs.router.table : Route;
14 import libsweatyballs.zwitch.neighbor : Neighbor;
15 import std.string : cmp;
16 
17 /**
18 * Link
19 *
20 * Description: Represents a "pipe" whereby different protocol messages can be transported over.
21 * Such protocol messages include data packet transport (receiving and sending) along with
22 * router advertisements messages
23 *
24 * This class handles the Message queues for sending and receiving messages (and partially decoding them)
25 */
26 public final class Link : Thread
27 {
28     /**
29     * In and out queues
30     */
31     private LinkUnit[] inQueue;
32     private LinkUnit[] outQueue;
33     private Mutex inQueueLock;
34     private Mutex outQueueLock;
35 
36     private string interfaceName;
37 
38 
39     private Engine engine;
40 
41 
42     private Watcher advWatch;
43     private Watcher neighWatch;
44 
45     this(string interfaceName, Engine engine)
46     {
47         /* Set the thread's worker function */
48         super(&worker);
49 
50         this.interfaceName = interfaceName;
51         this.engine = engine;
52 
53         /* Initialize locks */
54         initMutexes();
55 
56         /* Setup watchers */
57         setupWatchers();
58     }
59 
60     public string getInterface()
61     {
62         return interfaceName;
63     }
64 
65     /**
66     * Initialize the queue mutexes
67     */
68     private void initMutexes()
69     {
70         inQueueLock = new Mutex();
71         outQueueLock = new Mutex();
72     }
73 
74     /**
75     * Sets up watchers, one for advertisements and one for
76     * nieghbor-to-neighbor communications
77     */
78     private void setupWatchers()
79     {
80         /* Setup the advertisement socket (bound to ff02::1%interface) port 6666 */
81         Socket mcastSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP);
82         mcastSock.bind(parseAddress("ff02::1%"~getInterface(), 6666));
83 
84         /* Setup the advertisement watcher */
85         advWatch = new Watcher(this, mcastSock);
86 
87 
88         /* Setup the router-to-router socket (bound to ::) port 6667 */
89         Socket neighSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP);
90         neighSock.bind(parseAddress("::", 0));
91 
92         /* Setup the neighbor watcher */
93         neighWatch = new Watcher(this, neighSock);
94     }
95 
96     /**
97     * Returns the router-to-router port being used for this link
98     */
99     public ushort getR2RPort()
100     {
101         return neighWatch.getPort();
102     }
103 
104     /**
105     * Listens for advertisements
106     *
107     * TODO: We also must listen for traffic here though
108     */
109     private void worker()
110     {
111         while(true)
112         {
113 
114             /**
115             * Check if there are any LinkUnit's to be processed
116             * then process them
117             */
118             // gprintln(hasInQueue());
119         
120             if(hasInQueue())
121             {
122                 /* Pop off a message */
123                 LinkUnit unit = popInQueue();
124 
125                 /* Process message */
126                 process(unit); 
127 
128                 gprintln("Pablo");
129             }
130 
131             
132         }
133     }
134 
135 
136     /**
137     * Given Address we take the IP address (not source port of mcast packet)
138     * and then also the `nieghborPort` and spit out a new Address
139     */
140     private static Address getNeighborIPAddress(Address sender, ushort neighborPort)
141     {
142         /* IPv6 reachable neighbor socket */
143         Address neighborAddress = parseAddress(sender.toAddrString(), neighborPort);
144 
145         return neighborAddress;
146     }
147 
148     /**
149     * This will process the message
150     *
151     * Handles message type: SESSION, ADVERTISEMENT
152     */
153     private void process(LinkUnit unit)
154     {
155         /**
156         * Message details
157         *
158         * 1. Public key of Link Neighbor
159         * 2. Signature of Link Neighbor
160         * 3. Neighbor port of LinkNeighbor
161         * 4. Message type (also from LinkUnit address)
162         * 5. Payload
163         */
164         link.LinkMessage message = unit.getMessage();
165         link.LinkMessageType mType = message.type;
166         Address sender = unit.getSender();
167         string identity = message.publicKey;
168         ushort neighborPort = to!(ushort)(message.neighborPort);
169         ubyte[] msgPayload = message.payload;
170 
171 
172         gprintln("Processing message from "~to!(string)(sender)~
173                 " of type "~to!(string)(mType));
174         gprintln("Public key: "~identity);
175         gprintln("Signature: Not yet implemented");
176         gprintln("Neighbor Port: "~to!(string)(neighborPort));
177 
178 
179 
180         /**
181         * Enter the Neighbor details into the Switch
182         */
183         Address neighborAddress = getNeighborIPAddress(sender, neighborPort);
184         Neighbor neighbor = new Neighbor(identity, neighborAddress, this);
185         engine.getSwitch().addNeighbor(neighbor);
186 
187 
188 
189 
190         /* Handle route advertisements */
191         if(mType == LinkMessageType.ADVERTISEMENT)
192         {
193             Advertisement advMsg = fromProtobuf!(link.Advertisement)(msgPayload);
194 
195             /* Get the routes being advertised */
196             RouteEntry[] routes = advMsg.routes;
197             gprintln("Total of "~to!(string)(routes.length)~" received");
198 
199             /* TODO: Do router2router verification here */
200 
201             /* Add each route to the routing table */
202             foreach(RouteEntry route; routes)
203             {
204                 uint metric = route.metric;
205 
206                 /**
207                 * Create a new route with `nexthop` as the nexthop address
208                 * Also set its metric to whatever it is +64
209                 */
210                 Route newRoute = new Route(route.address, neighbor, 100, metric+64);
211 
212                 gprintln(route.address);
213                 gprintln(engine.getRouter().getIdentity().getKeys().publicKey);
214 
215                 /**
216                 * Don't add routes to oneself
217                 */
218                 if(cmp(route.address, engine.getRouter().getIdentity().getKeys().publicKey) != 0)
219                 {
220                     /**
221                     * Don;t add routes we advertised (from ourself) - this
222                     * ecludes self route checked before entering here
223                     */
224                     if(newRoute.getNexthop().getIdentity() != engine.getRouter().getIdentity().getKeys().publicKey)
225                     {
226                         /**
227                         * TODO: Found it, only install routes if their updated metric on arrival is lesser than current route
228                         * TODO: Search for existing route
229                         * TODO: This might make above constraints nmeaningless, or well atleast the one above me (outer one not me thinks)
230                         */
231                         Route possibleExistingRoute = engine.getRouter().getTable().lookup(route.address);
232 
233                         /* If no route exists then add it */
234                         if(!possibleExistingRoute)
235                         {
236                             engine.getRouter().getTable().addRoute(newRoute);
237                         }
238                         /* If a route exists only install it if current one has higher metric than advertised one */
239                         else
240                         {
241                             if(possibleExistingRoute.getMetric() > newRoute.getMetric())
242                             {
243                                 /* Remove the old one (higher metric than advertised route) */
244                                 engine.getRouter().getTable().removeRoute(possibleExistingRoute);
245 
246                                 /* Install the advertised route (lower metric than currently installed route) */
247                                 engine.getRouter().getTable().addRoute(newRoute);
248                             }
249                         }
250                         
251                     }
252                     else
253                     {
254                         gprintln("Not adding a route that originated from us", DebugType.ERROR);
255                     }
256                 
257                 }
258                 else
259                 {
260                     gprintln("Skipping addition of self-route", DebugType.WARNING);
261                 }
262             }
263         }
264         /* Handle session messages */
265         else if(mType == LinkMessageType.PACKET)
266         {
267             gprintln("Woohoo! PACKET received!", DebugType.WARNING);
268 
269             try
270             {
271 
272                 /* TODO: Now check if destined to me, if so THEN attempt decrypt */
273                 link.Packet packet = fromProtobuf!(link.Packet)(msgPayload);
274                 gprintln("Payload (encrypted): "~to!(string)(packet.payload));
275 
276                 /* Attempt decrypting with my key */
277                 import crypto.rsa;
278                 import std.string : cmp;
279 
280                 /* TODO: Make sure decryotion passes, maybe add a PayloadBytes thing to use that */
281                 ubyte[] decryptedPayload = RSA.decrypt(engine.getRouter().getIdentity().getKeys().privateKey, packet.payload);
282                 gprintln("Payload (decrypted): "~cast(string)(decryptedPayload));
283 
284                 /* Now see if it is destined to us */
285 
286                 /* If it is destined to us (TODO: Accept it) */
287                 if(cmp(packet.toKey, engine.getRouter().getIdentity().getKeys().publicKey) == 0)
288                 {
289                     gprintln("PACKET IS ACCEPTED TO ME", DebugType.WARNING);
290 
291                     bool stat = engine.getSwitch().isNeighbour(packet.fromKey) !is null;
292                     gprintln("WasPacketFromNeighbor: "~to!(string)(stat), DebugType.WARNING);
293                 }
294                 /* If it is not destined to me then forward it */
295                 else
296                 {
297                     engine.getSwitch().forward(packet);
298                     gprintln("PACKET IS FORWRDED", DebugType.WARNING);
299                 }
300             
301             }
302             catch(ProtobufException)
303             {
304                 gprintln("Failed to deserialize protobuff bytes", DebugType.ERROR);
305             }
306             
307 
308            
309             
310             
311         }
312         /* TODO: Does protobuf throw en error if so? */
313         else
314         {
315             assert(false);
316         }
317     }
318 
319     public static LinkMessage decode(byte[] data)
320     {
321         try
322         {
323             ubyte[] dataIn = cast(ubyte[])data;
324             LinkMessage message = fromProtobuf!(LinkMessage)(dataIn);
325             return message;
326         }
327         catch(ProtobufException)
328         {
329             return null;
330         }
331     }
332 
333     public void enqueueIn(LinkUnit unit)
334     {
335         /* Add to the in-queue */
336         inQueueLock.lock();
337         inQueue ~= unit;
338         inQueueLock.unlock();
339     }
340 
341     public bool hasInQueue()
342     {
343         bool status;
344         inQueueLock.lock();
345         status = inQueue.length != 0;
346         inQueueLock.unlock();
347         return status;
348     }
349 
350     public LinkUnit popInQueue()
351     {
352         LinkUnit message;
353 
354         /* TODO: Throw exception on `hasInQueue()` false */
355 
356         inQueueLock.lock();
357 
358         /* Pop the message */
359         message = inQueue[0];
360 
361         if(inQueue.length == 1)
362         {
363             inQueue.length = 0;
364         }
365         else
366         {
367             inQueue = inQueue[1..inQueue.length];
368         }
369 
370         inQueueLock.unlock();
371 
372         return message;
373     }
374 
375     // public bool hasOutQueue()
376     // {
377     //     bool status;
378     //     inQueueLock.lock();
379     //     status = inQueue.length != 0;
380     //     inQueueLock.unlock();
381     //     return status;
382     // }
383 
384     
385 
386 
387     public void launch()
388     {
389         start();
390 
391         advWatch.start();
392         neighWatch.start();
393     }
394 
395 
396 
397     /**
398     * Blocks to receive one message from the incoming queue
399     */
400     public LinkMessage receive()
401     {
402         /* TODO: Implement me */
403         return null;
404     }
405 
406     /**
407     * Sends a message
408     */
409     public void send(LinkMessage message, string recipient)
410     {
411         /* TODO: Implement me */
412     }
413 }
414 
415 /**
416 * Watcher
417 *
418 * Given a socket this will dequeue message, decode them and pass
419 * them up to the Link for processing
420 */
421 public final class Watcher : Thread
422 {
423     private Socket socket;
424     private Link link;
425 
426     this(Link link,  Socket socket)
427     {
428         super(&worker);
429         this.link = link;
430         this.socket = socket;
431     }
432 
433     public ushort getPort()
434     {
435         return to!(ushort)(socket.localAddress.toPortString());
436     }
437 
438     /**
439     * Listens for advertisements
440     *
441     * TODO: We also must listen for traffic here though
442     */
443     private void worker()
444     {
445         while(true)
446         {
447 
448             /**
449             * MSG_PEEK, we don't want to dequeue this message yet but need to call receive
450             * MSG_TRUNC, return the number of bytes of the datagram even when
451             * bigger than passed in array
452             */
453             SocketFlags flags;
454             flags |= MSG_TRUNC;
455             flags |= MSG_PEEK;
456 
457             /* Receive buffer */
458             byte[] data;
459             Address address;
460 
461             /* Empty array won't work */
462             data.length = 1;
463             
464             gprintln("Awaiting message...");
465             long len = socket.receiveFrom(data, flags, address);
466 
467             if(len <= 0)
468             {
469                 /* TODO: Error handling */
470             }
471             else
472             {
473                 /* Receive at the length found */
474                 data.length = len;
475                 socket.receiveFrom(data, address);
476 
477                 /* Decode the message */
478                 LinkMessage message = Link.decode(data);
479 
480                 /* If decoding worked */
481                 if(message)
482                 {
483                     /* Couple Address-and-message */
484                     LinkUnit unit = new LinkUnit(address, message);
485 
486                     /* Process message */
487                     link.enqueueIn(unit); 
488                 }
489                 /* If ProtocolBuffer decoding failed */
490                 else
491                 {
492                     gprintln("Watcher: ProtocolBuffer decode failed", DebugType.ERROR);
493                 }
494             }
495             
496         }
497     }
498 }