1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.LinkImpl;
13 
14 import hunt.collection.Set;
15 import hunt.collection.Map;
16 
17 import hunt.proton.amqp.Symbol;
18 import hunt.proton.amqp.UnsignedLong;
19 import hunt.proton.amqp.transport.ReceiverSettleMode;
20 import hunt.proton.amqp.transport.SenderSettleMode;
21 import hunt.proton.amqp.transport.Source;
22 import hunt.proton.amqp.transport.Target;
23 import hunt.proton.engine.EndpointState;
24 import hunt.proton.engine.Event;
25 import hunt.proton.engine.Link;
26 import hunt.proton.engine.impl.EndpointImpl;
27 import hunt.proton.engine.impl.DeliveryImpl;
28 import hunt.proton.engine.impl.LinkNode;
29 import hunt.Exceptions;
30 import hunt.proton.engine.impl.SessionImpl;
31 import hunt.proton.engine.impl.ConnectionImpl;
32 import hunt.proton.engine.impl.TransportLink;
33 import hunt.collection.Set;
34 import hunt.proton.engine.impl.EndpointImplQuery;
35 import hunt.proton.engine.impl.SenderImpl;
36 
37 class LinkImpl : EndpointImpl , Link
38 {
39 
40     private  SessionImpl _session;
41 
42     DeliveryImpl _head;
43     DeliveryImpl _tail;
44     DeliveryImpl _current;
45     private string _name;
46     private Source _source;
47     private Source _remoteSource;
48     private Target _target;
49     private Target _remoteTarget;
50     private int _queued;
51     private int _credit;
52     private int _unsettled;
53     private int _drained;
54     private UnsignedLong _maxMessageSize;
55     private UnsignedLong _remoteMaxMessageSize;
56 
57     private SenderSettleMode _senderSettleMode;
58     private SenderSettleMode _remoteSenderSettleMode;
59     private ReceiverSettleMode _receiverSettleMode;
60     private ReceiverSettleMode _remoteReceiverSettleMode;
61 
62 
63     private LinkNode!(LinkImpl) _node;
64     private bool _drain;
65     private bool _detached;
66     private Map!(Symbol, Object) _properties;
67     private Map!(Symbol, Object) _remoteProperties;
68     private Symbol[] _offeredCapabilities;
69     private Symbol[] _remoteOfferedCapabilities;
70     private Symbol[] _desiredCapabilities;
71     private Symbol[] _remoteDesiredCapabilities;
72 
73     this(SessionImpl session, string name)
74     {
75         _session = session;
76         _session.incref();
77         _name = name;
78         ConnectionImpl conn = session.getConnectionImpl();
79         _node = conn.addLinkEndpoint(this);
80         conn.put(Type.LINK_INIT, this);
81     }
82 
83 
84     public string getName()
85     {
86         return _name;
87     }
88 
89     public DeliveryImpl delivery(byte[] tag)
90     {
91         return delivery(tag, 0, cast(int)tag.length);
92     }
93 
94     public DeliveryImpl delivery(byte[] tag, int offset, int length)
95     {
96         if (offset != 0 || length != tag.length)
97         {
98             throw new IllegalArgumentException("At present delivery tag must be the whole byte array");
99         }
100         incrementQueued();
101         try
102         {
103             DeliveryImpl delivery = new DeliveryImpl(tag, this, _tail);
104             if (_tail is null)
105             {
106                 _head = delivery;
107             }
108             _tail = delivery;
109             if (_current is null)
110             {
111                 _current = delivery;
112             }
113             getConnectionImpl().workUpdate(delivery);
114             return delivery;
115         }
116         catch (RuntimeException e)
117         {
118             //e.printStackTrace();
119             //throw e;
120         }
121         return null;
122     }
123 
124     override
125     void postFinal() {
126         _session.getConnectionImpl().put(Type.LINK_FINAL, this);
127         _session.decref();
128     }
129 
130     override
131     void doFree()
132     {
133         DeliveryImpl dlv = _head;
134         while (dlv !is null) {
135             DeliveryImpl next = dlv.next();
136             dlv.free();
137             dlv = next;
138         }
139 
140         _session.getConnectionImpl().removeLinkEndpoint(_node);
141         _node = null;
142     }
143 
144     void modifyEndpoints() {
145         modified();
146     }
147 
148     /*
149      * Called when settling a message to ensure that the head/tail refs of the link are updated.
150      * The caller ensures the delivery updates its own refs appropriately.
151      */
152     void remove(DeliveryImpl delivery)
153     {
154         if(_head == delivery)
155         {
156             _head = delivery.getLinkNext();
157         }
158         if(_tail == delivery)
159         {
160             _tail = delivery.getLinkPrevious();
161         }
162     }
163 
164     public DeliveryImpl current()
165     {
166         return _current;
167     }
168 
169     public bool advance()
170     {
171         if(_current !is null )
172         {
173             DeliveryImpl oldCurrent = _current;
174             _current = _current.getLinkNext();
175             getConnectionImpl().workUpdate(oldCurrent);
176 
177             if(_current !is null)
178             {
179                 getConnectionImpl().workUpdate(_current);
180             }
181             return true;
182         }
183         else
184         {
185             return false;
186         }
187 
188     }
189 
190     override
191     public ConnectionImpl getConnectionImpl()
192     {
193         return _session.getConnectionImpl();
194     }
195 
196     public SessionImpl getSession()
197     {
198         return _session;
199     }
200 
201     public Source getRemoteSource()
202     {
203         return _remoteSource;
204     }
205 
206     void setRemoteSource(Source source)
207     {
208         _remoteSource = source;
209     }
210 
211     public Target getRemoteTarget()
212     {
213         return _remoteTarget;
214     }
215 
216     void setRemoteTarget(Target target)
217     {
218         _remoteTarget = target;
219     }
220 
221     public Source getSource()
222     {
223         return _source;
224     }
225 
226     public void setSource(Source source)
227     {
228         // TODO - should be an error if local state is ACTIVE
229         _source = source;
230     }
231 
232     public Target getTarget()
233     {
234         return _target;
235     }
236 
237     public void setTarget(Target target)
238     {
239         // TODO - should be an error if local state is ACTIVE
240         _target = target;
241     }
242 
243     public Link next(Set!EndpointState local, Set!EndpointState remote)
244     {
245         Query!LinkImpl query = new EndpointImplQuery!LinkImpl(local, remote);
246 
247         LinkNode!LinkImpl linkNode = _node.next(query);
248 
249         return linkNode is null ? null : linkNode.getValue();
250 
251     }
252 
253     abstract TransportLink getTransportLink();
254 
255     public int getCredit()
256     {
257         return _credit;
258     }
259 
260     public void addCredit(int credit)
261     {
262         _credit+=credit;
263     }
264 
265     public void setCredit(int credit)
266     {
267         _credit = credit;
268     }
269 
270     bool hasCredit()
271     {
272         return _credit > 0;
273     }
274 
275     void incrementCredit()
276     {
277         _credit++;
278     }
279 
280     void decrementCredit()
281     {
282         _credit--;
283     }
284 
285     public int getQueued()
286     {
287         return _queued;
288     }
289 
290     void incrementQueued()
291     {
292         _queued++;
293     }
294 
295     void decrementQueued()
296     {
297         _queued--;
298     }
299 
300     public int getUnsettled()
301     {
302         return _unsettled;
303     }
304 
305     void incrementUnsettled()
306     {
307         _unsettled++;
308     }
309 
310     void decrementUnsettled()
311     {
312         _unsettled--;
313     }
314 
315     void setDrain(bool drain)
316     {
317         _drain = drain;
318     }
319 
320     override
321     public bool getDrain()
322     {
323         return _drain;
324     }
325 
326     override
327     public SenderSettleMode getSenderSettleMode()
328     {
329         return _senderSettleMode;
330     }
331 
332     override
333     public void setSenderSettleMode(SenderSettleMode senderSettleMode)
334     {
335         _senderSettleMode = senderSettleMode;
336     }
337 
338     override
339     public SenderSettleMode getRemoteSenderSettleMode()
340     {
341         return _remoteSenderSettleMode;
342     }
343 
344     override
345     public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode)
346     {
347         _remoteSenderSettleMode = remoteSenderSettleMode;
348     }
349 
350     override
351     public ReceiverSettleMode getReceiverSettleMode()
352     {
353         return _receiverSettleMode;
354     }
355 
356     override
357     public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode)
358     {
359         _receiverSettleMode = receiverSettleMode;
360     }
361 
362     override
363     public ReceiverSettleMode getRemoteReceiverSettleMode()
364     {
365         return _remoteReceiverSettleMode;
366     }
367 
368     void setRemoteReceiverSettleMode(ReceiverSettleMode remoteReceiverSettleMode)
369     {
370         _remoteReceiverSettleMode = remoteReceiverSettleMode;
371     }
372 
373     override
374     public Map!(Symbol, Object) getProperties()
375     {
376         return _properties;
377     }
378 
379     override
380     public void setProperties(Map!(Symbol, Object) properties)
381     {
382         _properties = properties;
383     }
384 
385     override
386     public Map!(Symbol, Object) getRemoteProperties()
387     {
388         return _remoteProperties;
389     }
390 
391     void setRemoteProperties(Map!(Symbol, Object) remoteProperties)
392     {
393         _remoteProperties = remoteProperties;
394     }
395 
396     override
397     public Symbol[] getDesiredCapabilities()
398     {
399         return _desiredCapabilities;
400     }
401 
402     override
403     public void setDesiredCapabilities(Symbol[] desiredCapabilities)
404     {
405         _desiredCapabilities = desiredCapabilities;
406     }
407 
408     override
409     public Symbol[] getRemoteDesiredCapabilities()
410     {
411         return _remoteDesiredCapabilities;
412     }
413 
414     void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
415     {
416         _remoteDesiredCapabilities = remoteDesiredCapabilities;
417     }
418 
419     override
420     public Symbol[] getOfferedCapabilities()
421     {
422         return _offeredCapabilities;
423     }
424 
425     override
426     public void setOfferedCapabilities(Symbol[] offeredCapabilities)
427     {
428         _offeredCapabilities = offeredCapabilities;
429     }
430 
431     override
432     public Symbol[] getRemoteOfferedCapabilities()
433     {
434         return _remoteOfferedCapabilities;
435     }
436 
437     void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
438     {
439         _remoteOfferedCapabilities = remoteOfferedCapabilities;
440     }
441 
442     override
443     public UnsignedLong getMaxMessageSize()
444     {
445         return _maxMessageSize;
446     }
447 
448     override
449     public void setMaxMessageSize(UnsignedLong maxMessageSize)
450     {
451         _maxMessageSize = maxMessageSize;
452     }
453 
454     override
455     public UnsignedLong getRemoteMaxMessageSize()
456     {
457         return _remoteMaxMessageSize;
458     }
459 
460     void setRemoteMaxMessageSize(UnsignedLong remoteMaxMessageSize)
461     {
462         _remoteMaxMessageSize = remoteMaxMessageSize;
463     }
464 
465     override
466     public int drained()
467     {
468         int drained = 0;
469 
470         if (cast(SenderImpl)this !is null) {
471             if(getDrain() && hasCredit())
472             {
473                 _drained = getCredit();
474                 setCredit(0);
475                 modified();
476                 drained = _drained;
477             }
478         } else {
479             drained = _drained;
480             _drained = 0;
481         }
482 
483         return drained;
484     }
485 
486     int getDrained()
487     {
488         return _drained;
489     }
490 
491     void setDrained(int value)
492     {
493         _drained = value;
494     }
495 
496     override
497     public DeliveryImpl head()
498     {
499         return _head;
500     }
501 
502     override
503     void localOpen()
504     {
505         getConnectionImpl().put(Type.LINK_LOCAL_OPEN, this);
506     }
507 
508     override
509     void localClose()
510     {
511         getConnectionImpl().put(Type.LINK_LOCAL_CLOSE, this);
512     }
513 
514     override
515     public void detach()
516     {
517         _detached = true;
518         getConnectionImpl().put(Type.LINK_LOCAL_DETACH, this);
519         modified();
520     }
521 
522     public bool detached()
523     {
524         return _detached;
525     }
526 }