1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.TransportSession;
13 
14 import hunt.collection.HashMap;
15 import hunt.collection.Map;
16 
17 import hunt.proton.amqp.Binary;
18 import hunt.proton.amqp.UnsignedInteger;
19 import hunt.proton.amqp.transport.Disposition;
20 import hunt.proton.amqp.transport.Flow;
21 import hunt.proton.amqp.transport.Role;
22 import hunt.proton.amqp.transport.Transfer;
23 import hunt.proton.engine.Event;
24 import std.concurrency : initOnce;
25 import hunt.proton.engine.impl.TransportImpl;
26 import hunt.proton.engine.impl.SessionImpl;
27 import hunt.proton.engine.impl.TransportLink;
28 import hunt.proton.engine.impl.DeliveryImpl;
29 import hunt.proton.engine.impl.TransportReceiver;
30 import hunt.Exceptions;
31 import hunt.proton.engine.impl.ReceiverImpl;
32 import hunt.proton.engine.impl.TransportDelivery;
33 import hunt.Boolean;
34 
35 class TransportSession
36 {
37     private static int HANDLE_MAX = 65535;
38     //private static UnsignedInteger DEFAULT_WINDOW_SIZE = UnsignedInteger.valueOf(2147483647); // biggest legal value
39 
40      static UnsignedInteger  DEFAULT_WINDOW_SIZE() {
41          __gshared UnsignedInteger  inst;
42          return initOnce!inst(UnsignedInteger.valueOf(2147483647));
43      }
44 
45     //static UnsignedInteger  _handleMax() {
46     //    __gshared UnsignedInteger  inst;
47     //    return initOnce!inst(UnsignedInteger.valueOf(HANDLE_MAX));
48     //}
49     //
50     // static UnsignedInteger  _outgoingDeliveryId() {
51     //     __gshared UnsignedInteger  inst;
52     //     return initOnce!inst(UnsignedInteger.ZERO);
53     // }
54     //
55     //static UnsignedInteger  _incomingWindowSize() {
56     //    __gshared UnsignedInteger  inst;
57     //    return initOnce!inst(UnsignedInteger.ZERO);
58     //}
59     //
60     // static UnsignedInteger  _outgoingWindowSize() {
61     //     __gshared UnsignedInteger  inst;
62     //     return initOnce!inst(UnsignedInteger.ZERO);
63     // }
64     //
65     //static UnsignedInteger  _nextOutgoingId() {
66     //    __gshared UnsignedInteger  inst;
67     //    return initOnce!inst(UnsignedInteger.ONE);
68     //}
69 
70     private TransportImpl _transport;
71     private SessionImpl _session;
72     private int _localChannel = -1;
73     private int _remoteChannel = -1;
74     private bool _openSent;
75     private UnsignedInteger _handleMax ;//= UnsignedInteger.valueOf(HANDLE_MAX); //TODO: should this be configurable?
76     // This is used for the delivery-id actually stamped in each transfer frame of a given message delivery.
77     private UnsignedInteger _outgoingDeliveryId ;//= UnsignedInteger.ZERO;
78      //These are used for the session windows communicated via Begin/Flow frames
79      //and the conceptual transfer-id relating to updating them.
80     private UnsignedInteger _incomingWindowSize ;//= UnsignedInteger.ZERO;
81     private UnsignedInteger _outgoingWindowSize ;//= UnsignedInteger.ZERO;
82     private UnsignedInteger _nextOutgoingId  ;//UnsignedInteger.ONE;
83     private UnsignedInteger _nextIncomingId = null;
84 
85     private Map!(UnsignedInteger, TransportLink) _remoteHandlesMap ;// = new HashMap<UnsignedInteger, TransportLink<?>>();
86     private Map!(UnsignedInteger, TransportLink) _localHandlesMap ;// = new HashMap<UnsignedInteger, TransportLink<?>>();
87     private Map!(string, TransportLink) _halfOpenLinks  ;//= new HashMap<String, TransportLink>();
88 
89 
90     private UnsignedInteger _incomingDeliveryId = null;
91     private UnsignedInteger _remoteIncomingWindow;
92     private UnsignedInteger _remoteOutgoingWindow;
93     private UnsignedInteger _remoteNextIncomingId ;//= _nextOutgoingId;
94     private UnsignedInteger _remoteNextOutgoingId;
95     private Map!(UnsignedInteger, DeliveryImpl) _unsettledIncomingDeliveriesById  ;//= new HashMap<UnsignedInteger, DeliveryImpl>();
96     private Map!(UnsignedInteger, DeliveryImpl) _unsettledOutgoingDeliveriesById ;// = new HashMap<UnsignedInteger, DeliveryImpl>();
97     private int _unsettledIncomingSize;
98     private bool _endReceived;
99     private bool _beginSent;
100 
101 
102     this(TransportImpl transport, SessionImpl session)
103     {
104         _transport = transport;
105         _session = session;
106         _outgoingWindowSize = UnsignedInteger.valueOf(session.getOutgoingWindow());
107 
108         _handleMax = UnsignedInteger.valueOf(HANDLE_MAX);
109         _outgoingDeliveryId = UnsignedInteger.ZERO;
110         _incomingWindowSize = UnsignedInteger.ZERO;
111        //
112         // _outgoingWindowSize = UnsignedInteger.ZERO;
113         _nextOutgoingId = UnsignedInteger.ONE;
114 
115         _remoteNextIncomingId = _nextOutgoingId;
116         _remoteHandlesMap = new HashMap!(UnsignedInteger, TransportLink)();
117         _localHandlesMap = new HashMap!(UnsignedInteger, TransportLink)();
118          _halfOpenLinks = new HashMap!(string, TransportLink)();
119         _unsettledIncomingDeliveriesById = new HashMap!(UnsignedInteger, DeliveryImpl)();
120         _unsettledOutgoingDeliveriesById = new HashMap!(UnsignedInteger, DeliveryImpl)();
121     }
122 
123     void unbind()
124     {
125         unsetLocalChannel();
126         unsetRemoteChannel();
127     }
128 
129     public SessionImpl getSession()
130     {
131         return _session;
132     }
133 
134     public int getLocalChannel()
135     {
136         return _localChannel;
137     }
138 
139     public void setLocalChannel(int localChannel)
140     {
141         if (!isLocalChannelSet()) {
142             _session.incref();
143         }
144         _localChannel = localChannel;
145     }
146 
147     public int getRemoteChannel()
148     {
149         return _remoteChannel;
150     }
151 
152     public void setRemoteChannel(int remoteChannel)
153     {
154         if (!isRemoteChannelSet()) {
155             _session.incref();
156         }
157         _remoteChannel = remoteChannel;
158     }
159 
160     public bool isOpenSent()
161     {
162         return _openSent;
163     }
164 
165     public void setOpenSent(bool openSent)
166     {
167         _openSent = openSent;
168     }
169 
170     public bool isRemoteChannelSet()
171     {
172         return _remoteChannel != -1;
173     }
174 
175     public bool isLocalChannelSet()
176     {
177         return _localChannel != -1;
178     }
179 
180     public void unsetLocalChannel()
181     {
182         if (isLocalChannelSet()) {
183             unsetLocalHandles();
184             _session.decref();
185         }
186         _localChannel = -1;
187     }
188 
189     private void unsetLocalHandles()
190     {
191         foreach (TransportLink tl ; _localHandlesMap.values())
192         {
193             tl.clearLocalHandle();
194         }
195         _localHandlesMap.clear();
196     }
197 
198     public void unsetRemoteChannel()
199     {
200         if (isRemoteChannelSet()) {
201             unsetRemoteHandles();
202             _session.decref();
203         }
204         _remoteChannel = -1;
205     }
206 
207     private void unsetRemoteHandles()
208     {
209         foreach (TransportLink tl ; _remoteHandlesMap.values())
210         {
211             tl.clearRemoteHandle();
212         }
213         _remoteHandlesMap.clear();
214     }
215 
216     public UnsignedInteger getHandleMax()
217     {
218         return _handleMax;
219     }
220 
221     public UnsignedInteger getIncomingWindowSize()
222     {
223         return _incomingWindowSize;
224     }
225 
226     void updateIncomingWindow()
227     {
228         int incomingCapacity = _session.getIncomingCapacity();
229         int size = _transport.getMaxFrameSize();
230         if (incomingCapacity <= 0 || size <= 0) {
231             _incomingWindowSize = DEFAULT_WINDOW_SIZE;
232         } else {
233             _incomingWindowSize = UnsignedInteger.valueOf((incomingCapacity - _session.getIncomingBytes())/size);
234         }
235     }
236 
237     public UnsignedInteger getOutgoingDeliveryId()
238     {
239         return _outgoingDeliveryId;
240     }
241 
242     void incrementOutgoingDeliveryId()
243     {
244         _outgoingDeliveryId = _outgoingDeliveryId.add(UnsignedInteger.ONE);
245     }
246 
247     public UnsignedInteger getOutgoingWindowSize()
248     {
249         return _outgoingWindowSize;
250     }
251 
252     public UnsignedInteger getNextOutgoingId()
253     {
254         return _nextOutgoingId;
255     }
256 
257     public TransportLink getLinkFromRemoteHandle(UnsignedInteger handle)
258     {
259         return _remoteHandlesMap.get(handle);
260     }
261 
262     public UnsignedInteger allocateLocalHandle(TransportLink transportLink)
263     {
264         for(int i = 0; i <= HANDLE_MAX; i++)
265         {
266             UnsignedInteger handle = UnsignedInteger.valueOf(i);
267             if(!_localHandlesMap.containsKey(handle))
268             {
269                 _localHandlesMap.put(handle, transportLink);
270                 transportLink.setLocalHandle(handle);
271                 return handle;
272             }
273         }
274         throw new IllegalStateException("no local handle available for allocation");
275     }
276 
277     public void addLinkRemoteHandle(TransportLink link, UnsignedInteger remoteHandle)
278     {
279         _remoteHandlesMap.put(remoteHandle, link);
280     }
281 
282     public void addLinkLocalHandle(TransportLink link, UnsignedInteger localhandle)
283     {
284         _localHandlesMap.put(localhandle, link);
285     }
286 
287     public void freeLocalHandle(UnsignedInteger handle)
288     {
289         _localHandlesMap.remove(handle);
290     }
291 
292     public void freeRemoteHandle(UnsignedInteger handle)
293     {
294         _remoteHandlesMap.remove(handle);
295     }
296 
297     public TransportLink resolveHalfOpenLink(string name)
298     {
299         return _halfOpenLinks.remove(name);
300     }
301 
302     public void addHalfOpenLink(TransportLink link)
303     {
304         _halfOpenLinks.put(link.getName(), link);
305     }
306 
307     public void handleTransfer(Transfer transfer, Binary payload)
308     {
309         DeliveryImpl delivery;
310         incrementNextIncomingId(); // The conceptual/non-wire transfer-id, for the session window.
311 
312         TransportReceiver transportReceiver = cast(TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
313         UnsignedInteger linkIncomingDeliveryId = transportReceiver.getIncomingDeliveryId();
314         UnsignedInteger deliveryId = transfer.getDeliveryId();
315 
316         if(linkIncomingDeliveryId !is null && (linkIncomingDeliveryId == (deliveryId) || deliveryId is null))
317         {
318             delivery = _unsettledIncomingDeliveriesById.get(linkIncomingDeliveryId);
319             delivery.getTransportDelivery().incrementSessionSize();
320         }
321         else
322         {
323             verifyNewDeliveryIdSequence(_incomingDeliveryId, linkIncomingDeliveryId, deliveryId);
324 
325             _incomingDeliveryId = deliveryId;
326 
327             ReceiverImpl receiver = transportReceiver.getReceiver();
328             Binary deliveryTag = transfer.getDeliveryTag();
329             delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(),
330                                                       deliveryTag.getLength());
331             UnsignedInteger messageFormat = transfer.getMessageFormat();
332             if(messageFormat !is null) {
333                 delivery.setMessageFormat(messageFormat.intValue());
334             }
335             TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportReceiver);
336             delivery.setTransportDelivery(transportDelivery);
337             transportReceiver.setIncomingDeliveryId(deliveryId);
338             _unsettledIncomingDeliveriesById.put(deliveryId, delivery);
339             getSession().incrementIncomingDeliveries(1);
340         }
341 
342         if( transfer.getState()!is null )
343         {
344             delivery.setRemoteDeliveryState(transfer.getState());
345         }
346         _unsettledIncomingSize++;
347 
348         bool aborted = transfer.getAborted().booleanValue;
349         if (payload !is null && !aborted)
350         {
351             delivery.append(payload);
352             getSession().incrementIncomingBytes(payload.getLength());
353         }
354 
355         delivery.updateWork();
356 
357         if(!transfer.getMore().booleanValue || aborted)
358         {
359             transportReceiver.setIncomingDeliveryId(null);
360             if(aborted) {
361                 delivery.setAborted();
362             } else {
363                 delivery.setComplete();
364             }
365 
366             delivery.getLink().getTransportLink().decrementLinkCredit();
367             delivery.getLink().getTransportLink().incrementDeliveryCount();
368         }
369 
370         if(Boolean.TRUE == (transfer.getSettled()) || aborted)
371         {
372             delivery.setRemoteSettled(true);
373         }
374 
375         _incomingWindowSize = _incomingWindowSize.subtract(UnsignedInteger.ONE);
376 
377         // this will cause a flow to happen
378         if (_incomingWindowSize == (UnsignedInteger.ZERO)) {
379             delivery.getLink().modified(false);
380         }
381 
382         getSession().getConnection().put(Type.DELIVERY, delivery);
383     }
384 
385     private void verifyNewDeliveryIdSequence(UnsignedInteger previousId, UnsignedInteger linkIncomingId, UnsignedInteger newDeliveryId) {
386         if(newDeliveryId is null) {
387             throw new IllegalStateException("No delivery-id specified on first Transfer of new delivery");
388         }
389 
390         // Doing a primitive comparison, uses intValue() since its a uint sequence
391         // and we need the primitive values to wrap appropriately during comparison.
392         if(previousId !is null && previousId.intValue() + 1 != newDeliveryId.intValue()) {
393             throw new IllegalStateException("Expected delivery-id " );
394         }
395 
396         if(linkIncomingId !is null) {
397             throw new IllegalStateException("Illegal multiplex of deliveries on same link with delivery-id ");
398         }
399     }
400 
401     public void freeLocalChannel()
402     {
403         unsetLocalChannel();
404     }
405 
406     public void freeRemoteChannel()
407     {
408         unsetRemoteChannel();
409     }
410 
411     private void setRemoteIncomingWindow(UnsignedInteger incomingWindow)
412     {
413         _remoteIncomingWindow = incomingWindow;
414     }
415 
416     void decrementRemoteIncomingWindow()
417     {
418         _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
419     }
420 
421     private void setRemoteOutgoingWindow(UnsignedInteger outgoingWindow)
422     {
423         _remoteOutgoingWindow = outgoingWindow;
424     }
425 
426     void handleFlow(Flow flow)
427     {
428         UnsignedInteger inext = flow.getNextIncomingId();
429         UnsignedInteger iwin = flow.getIncomingWindow();
430 
431         if(inext !is null)
432         {
433             setRemoteNextIncomingId(inext);
434             setRemoteIncomingWindow(inext.add(iwin).subtract(_nextOutgoingId));
435         }
436         else
437         {
438             setRemoteIncomingWindow(iwin);
439         }
440         setRemoteNextOutgoingId(flow.getNextOutgoingId());
441         setRemoteOutgoingWindow(flow.getOutgoingWindow());
442 
443         if(flow.getHandle() !is null)
444         {
445             TransportLink transportLink = getLinkFromRemoteHandle(flow.getHandle());
446             transportLink.handleFlow(flow);
447 
448 
449         }
450     }
451 
452     private void setRemoteNextOutgoingId(UnsignedInteger nextOutgoingId)
453     {
454         _remoteNextOutgoingId = nextOutgoingId;
455     }
456 
457     private void setRemoteNextIncomingId(UnsignedInteger remoteNextIncomingId)
458     {
459         _remoteNextIncomingId = remoteNextIncomingId;
460     }
461 
462     void handleDisposition(Disposition disposition)
463     {
464         UnsignedInteger id = disposition.getFirst();
465         UnsignedInteger last = disposition.getLast() is null ? id : disposition.getLast();
466         Map!(UnsignedInteger, DeliveryImpl) unsettledDeliveries =
467                 disposition.getRole() == Role.RECEIVER ? _unsettledOutgoingDeliveriesById
468                         : _unsettledIncomingDeliveriesById;
469 
470         while(id <= (last))
471         {
472             DeliveryImpl delivery = unsettledDeliveries.get(id);
473             if(delivery !is null)
474             {
475                 if(disposition.getState() !is null)
476                 {
477                     delivery.setRemoteDeliveryState(disposition.getState());
478                 }
479                 if(Boolean.TRUE == (disposition.getSettled()))
480                 {
481                     delivery.setRemoteSettled(true);
482                     unsettledDeliveries.remove(id);
483                 }
484                 delivery.updateWork();
485 
486                 getSession().getConnection().put(Type.DELIVERY, delivery);
487             }
488             id = id.add(UnsignedInteger.ONE);
489         }
490         //TODO - Implement.
491     }
492 
493     void addUnsettledOutgoing(UnsignedInteger deliveryId, DeliveryImpl delivery)
494     {
495         _unsettledOutgoingDeliveriesById.put(deliveryId, delivery);
496     }
497 
498     public bool hasOutgoingCredit()
499     {
500         return _remoteIncomingWindow is null ? false
501             : _remoteIncomingWindow > (UnsignedInteger.ZERO);
502 
503     }
504 
505     void incrementOutgoingId()
506     {
507         _nextOutgoingId = _nextOutgoingId.add(UnsignedInteger.ONE);
508     }
509 
510     public void settled(TransportDelivery transportDelivery)
511     {
512         if( cast(ReceiverImpl) (transportDelivery.getTransportLink().getLink()) !is null )
513         {
514             _unsettledIncomingDeliveriesById.remove(transportDelivery.getDeliveryId());
515             getSession().modified(false);
516         }
517         else
518         {
519             _unsettledOutgoingDeliveriesById.remove(transportDelivery.getDeliveryId());
520             getSession().modified(false);
521         }
522     }
523 
524     public UnsignedInteger getNextIncomingId()
525     {
526         return _nextIncomingId;
527     }
528 
529     public void setNextIncomingId(UnsignedInteger nextIncomingId)
530     {
531         _nextIncomingId = nextIncomingId;
532     }
533 
534     public void incrementNextIncomingId()
535     {
536         _nextIncomingId = _nextIncomingId.add(UnsignedInteger.ONE);
537     }
538 
539     public bool endReceived()
540     {
541         return _endReceived;
542     }
543 
544     public void receivedEnd()
545     {
546         _endReceived = true;
547     }
548 
549     public bool beginSent()
550     {
551         return _beginSent;
552     }
553 
554     public void sentBegin()
555     {
556         _beginSent = true;
557     }
558 }