1 module libsweatyballs.link.core;
2 
3 import libsweatyballs.link.message.core;
4 import libsweatyballs.link.unit : LinkUnit;
5 import core.sync.mutex : Mutex;
6 import libsweatyballs.engine.core : Engine;
7 import core.thread : Thread;
8 import bmessage;
9 import std.socket;
10 import gogga;
11 import std.conv : to;
12 import google.protobuf;
13 import libsweatyballs.router.table : Route;
14 import libsweatyballs.zwitch.neighbor : Neighbor;
15 import std.string : cmp;
16 
17 /**
18 * Link
19 *
20 * Description: Represents a "pipe" whereby different protocol messages can be transported over.
21 * Such protocol messages include data packet transport (receiving and sending) along with
22 * router advertisements messages
23 *
24 * This class handles the Message queues for sending and receiving messages (and partially decoding them)
25 */
26 public final class Link : Thread
27 {
28     /**
29     * In and out queues
30     */
31     private LinkUnit[] inQueue;
32     private LinkUnit[] outQueue;
33     private Mutex inQueueLock;
34     private Mutex outQueueLock;
35 
36     private string interfaceName;
37 
38 
39     private Engine engine;
40 
41 
42     private Watcher advWatch;
43     private Watcher neighWatch;
44 
45     this(string interfaceName, Engine engine)
46     {
47         /* Set the thread's worker function */
48         super(&worker);
49 
50         this.interfaceName = interfaceName;
51         this.engine = engine;
52 
53         /* Initialize locks */
54         initMutexes();
55 
56         /* Setup watchers */
57         setupWatchers();
58     }
59 
60     public string getInterface()
61     {
62         return interfaceName;
63     }
64 
65     /**
66     * Initialize the queue mutexes
67     */
68     private void initMutexes()
69     {
70         inQueueLock = new Mutex();
71         outQueueLock = new Mutex();
72         handlersLock = new Mutex();
73     }
74 
75     /**
76     * Sets up watchers, one for advertisements and one for
77     * nieghbor-to-neighbor communications
78     */
79     private void setupWatchers()
80     {
81         /* Setup the advertisement socket (bound to ff02::1%interface) port 6666 */
82         Socket mcastSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP);
83         mcastSock.bind(parseAddress("ff02::1%"~getInterface(), 6666));
84 
85         /* Setup the advertisement watcher */
86         advWatch = new Watcher(this, mcastSock);
87 
88 
89         /* Setup the router-to-router socket (bound to ::) port 6667 */
90         Socket neighSock = new Socket(AddressFamily.INET6, SocketType.DGRAM, ProtocolType.UDP);
91         neighSock.bind(parseAddress("::", 0));
92 
93         /* Setup the neighbor watcher */
94         neighWatch = new Watcher(this, neighSock);
95     }
96 
97     /**
98     * Returns the router-to-router port being used for this link
99     */
100     public ushort getR2RPort()
101     {
102         return neighWatch.getPort();
103     }
104 
105     /**
106     * Listens for advertisements
107     *
108     * TODO: We also must listen for traffic here though
109     */
110     private void worker()
111     {
112         while(true)
113         {
114 
115             /**
116             * Check if there are any LinkUnit's to be processed
117             * then process them
118             */
119             // gprintln(hasInQueue());
120         
121             if(hasInQueue())
122             {
123                 /* Pop off a message */
124                 LinkUnit unit = popInQueue();
125 
126                 /* Process message */
127                 process(unit); 
128 
129                 gprintln("Pablo");
130             }
131 
132             
133         }
134     }
135 
136 
137     /**
138     * Given Address we take the IP address (not source port of mcast packet)
139     * and then also the `nieghborPort` and spit out a new Address
140     */
141     public static Address getNeighborIPAddress(Address sender, ushort neighborPort)
142     {
143         /* IPv6 reachable neighbor socket */
144         Address neighborAddress = parseAddress(sender.toAddrString(), neighborPort);
145 
146         return neighborAddress;
147     }
148 
149     alias LinkUnitHandler = void function (LinkUnit);
150 
151     private LinkUnitHandler[ubyte] handlers;
152     private Mutex handlersLock;
153 
154     public void registerHandler(LinkUnitHandler funcPtr, ubyte code)
155     {
156         handlersLock.lock();
157         handlers[code] = funcPtr;
158         handlersLock.unlock();
159     }
160 
161     private LinkUnitHandler defaultHandler;
162 
163     public void setDefaultHandler(LinkUnitHandler funcPtr)
164     {
165         defaultHandler = funcPtr;
166     }
167 
168     /**
169     * Returns the given handler function associated
170     * with the provided message type.
171     *
172     * If the message type is not found then a default
173     * handler is returned
174     *
175     * TODO: I should work on eventy again and use that
176     * for this project then (next version)
177     */
178     public LinkUnitHandler getHandler(ubyte code)
179     {
180         LinkUnitHandler handler;
181 
182         handlersLock.lock();
183 
184         handler = *(code in handlers);
185 
186         if(!handler)
187         {
188             handler = defaultHandler;
189         }
190 
191         handlersLock.unlock();
192     
193         
194         return handler;
195     }
196 
197     /**
198     * This will process the message
199     *
200     * Handles message type: SESSION, ADVERTISEMENT
201     */
202     private void process(LinkUnit unit)
203     {
204         /**
205         * Message details
206         *
207         * 1. Public key of Link Neighbor
208         * 2. Signature of Link Neighbor
209         * 3. Neighbor port of LinkNeighbor
210         * 4. Message type (also from LinkUnit address)
211         * 5. Payload
212         */
213         link.LinkMessage message = unit.getMessage();
214         link.LinkMessageType mType = message.type;
215         Address sender = unit.getSender();
216         string identity = message.publicKey;
217         ushort neighborPort = to!(ushort)(message.neighborPort);
218         ubyte[] msgPayload = message.payload;
219 
220 
221         gprintln("Processing message from "~to!(string)(sender)~
222                 " of type "~to!(string)(mType));
223         gprintln("Public key: "~identity);
224         gprintln("Signature: Not yet implemented");
225         gprintln("Neighbor Port: "~to!(string)(neighborPort));
226 
227 
228 
229 
230 
231         /**
232         * Enter the Neighbor details into the Switch
233         */
234         Address neighborAddress = getNeighborIPAddress(sender, neighborPort);
235         Neighbor neighbor = new Neighbor(identity, neighborAddress, this);
236         engine.getSwitch().addNeighbor(neighbor);
237 
238 
239         /**
240         * Get the handler required for the given message type
241         * and call it
242         */
243         LinkUnitHandler handlerFunc = getHandler(to!(ubyte)(message.type));
244         handlerFunc(unit);
245 
246 
247 
248 
249         /* Handle route advertisements */
250         if(mType == LinkMessageType.ADVERTISEMENT)
251         {
252             
253         }
254         /* Handle session messages */
255         else if(mType == LinkMessageType.PACKET)
256         {
257             
258         }
259         /* TODO: Does protobuf throw en error if so? */
260         else
261         {
262             assert(false);
263         }
264     }
265 
266     public static LinkMessage decode(byte[] data)
267     {
268         try
269         {
270             ubyte[] dataIn = cast(ubyte[])data;
271             LinkMessage message = fromProtobuf!(LinkMessage)(dataIn);
272             return message;
273         }
274         catch(ProtobufException)
275         {
276             return null;
277         }
278     }
279 
280     public void enqueueIn(LinkUnit unit)
281     {
282         /* Add to the in-queue */
283         inQueueLock.lock();
284         inQueue ~= unit;
285         inQueueLock.unlock();
286     }
287 
288     
289 
290     public bool hasInQueue()
291     {
292         bool status;
293         inQueueLock.lock();
294         status = inQueue.length != 0;
295         inQueueLock.unlock();
296         return status;
297     }
298 
299     public LinkUnit popInQueue()
300     {
301         LinkUnit message;
302 
303         /* TODO: Throw exception on `hasInQueue()` false */
304 
305         inQueueLock.lock();
306 
307         /* Pop the message */
308         message = inQueue[0];
309 
310         if(inQueue.length == 1)
311         {
312             inQueue.length = 0;
313         }
314         else
315         {
316             inQueue = inQueue[1..inQueue.length];
317         }
318 
319         inQueueLock.unlock();
320 
321         return message;
322     }
323 
324     // public bool hasOutQueue()
325     // {
326     //     bool status;
327     //     inQueueLock.lock();
328     //     status = inQueue.length != 0;
329     //     inQueueLock.unlock();
330     //     return status;
331     // }
332 
333     
334 
335 
336     public void launch()
337     {
338         start();
339 
340         advWatch.start();
341         neighWatch.start();
342     }
343 
344 
345 
346     /**
347     * Blocks to receive one message from the incoming queue
348     */
349     public LinkMessage receive()
350     {
351         /* TODO: Implement me */
352         return null;
353     }
354 
355     /**
356     * Sends a message
357     */
358     public void send(LinkMessage message, string recipient)
359     {
360         /* TODO: Implement me */
361     }
362 }
363 
364 /**
365 * Watcher
366 *
367 * Given a socket this will dequeue message, decode them and pass
368 * them up to the Link for processing
369 */
370 public final class Watcher : Thread
371 {
372     private Socket socket;
373     private Link link;
374 
375     this(Link link,  Socket socket)
376     {
377         super(&worker);
378         this.link = link;
379         this.socket = socket;
380     }
381 
382     public ushort getPort()
383     {
384         return to!(ushort)(socket.localAddress.toPortString());
385     }
386 
387     /**
388     * Listens for advertisements
389     *
390     * TODO: We also must listen for traffic here though
391     */
392     private void worker()
393     {
394         while(true)
395         {
396 
397             /**
398             * MSG_PEEK, we don't want to dequeue this message yet but need to call receive
399             * MSG_TRUNC, return the number of bytes of the datagram even when
400             * bigger than passed in array
401             */
402             SocketFlags flags;
403             flags |= MSG_TRUNC;
404             flags |= MSG_PEEK;
405 
406             /* Receive buffer */
407             byte[] data;
408             Address address;
409 
410             /* Empty array won't work */
411             data.length = 1;
412             
413             gprintln("Awaiting message...");
414             long len = socket.receiveFrom(data, flags, address);
415 
416             if(len <= 0)
417             {
418                 /* TODO: Error handling */
419             }
420             else
421             {
422                 /* Receive at the length found */
423                 data.length = len;
424                 socket.receiveFrom(data, address);
425 
426                 /* Decode the message */
427                 LinkMessage message = Link.decode(data);
428 
429                 /* If decoding worked */
430                 if(message)
431                 {
432                     /* Couple Address-and-message */
433                     LinkUnit unit = new LinkUnit(address, message, link);
434 
435                     /* Process message */
436                     link.enqueueIn(unit); 
437                 }
438                 /* If ProtocolBuffer decoding failed */
439                 else
440                 {
441                     gprintln("Watcher: ProtocolBuffer decode failed", DebugType.ERROR);
442                 }
443             }
444             
445         }
446     }
447 }