1 /*
2 * hunt-proton: AMQP Protocol library for D programming language.
3 *
4 * Copyright (C) 2018-2019 HuntLabs
5 *
6 * Website: https://www.huntlabs.net/
7 *
8 * Licensed under the Apache-2.0 License.
9 *
10 */
11
12 module hunt.proton.engine.impl.LinkImpl;
13
14 import hunt.collection.Set;
15 import hunt.collection.Map;
16
17 import hunt.proton.amqp.Symbol;
18 import hunt.proton.amqp.UnsignedLong;
19 import hunt.proton.amqp.transport.ReceiverSettleMode;
20 import hunt.proton.amqp.transport.SenderSettleMode;
21 import hunt.proton.amqp.transport.Source;
22 import hunt.proton.amqp.transport.Target;
23 import hunt.proton.engine.EndpointState;
24 import hunt.proton.engine.Event;
25 import hunt.proton.engine.Link;
26 import hunt.proton.engine.impl.EndpointImpl;
27 import hunt.proton.engine.impl.DeliveryImpl;
28 import hunt.proton.engine.impl.LinkNode;
29 import hunt.Exceptions;
30 import hunt.proton.engine.impl.SessionImpl;
31 import hunt.proton.engine.impl.ConnectionImpl;
32 import hunt.proton.engine.impl.TransportLink;
33 import hunt.collection.Set;
34 import hunt.proton.engine.impl.EndpointImplQuery;
35 import hunt.proton.engine.impl.SenderImpl;
36
37 class LinkImpl : EndpointImpl , Link
38 {
39
40 private SessionImpl _session;
41
42 DeliveryImpl _head;
43 DeliveryImpl _tail;
44 DeliveryImpl _current;
45 private string _name;
46 private Source _source;
47 private Source _remoteSource;
48 private Target _target;
49 private Target _remoteTarget;
50 private int _queued;
51 private int _credit;
52 private int _unsettled;
53 private int _drained;
54 private UnsignedLong _maxMessageSize;
55 private UnsignedLong _remoteMaxMessageSize;
56
57 private SenderSettleMode _senderSettleMode;
58 private SenderSettleMode _remoteSenderSettleMode;
59 private ReceiverSettleMode _receiverSettleMode;
60 private ReceiverSettleMode _remoteReceiverSettleMode;
61
62
63 private LinkNode!(LinkImpl) _node;
64 private bool _drain;
65 private bool _detached;
66 private Map!(Symbol, Object) _properties;
67 private Map!(Symbol, Object) _remoteProperties;
68 private Symbol[] _offeredCapabilities;
69 private Symbol[] _remoteOfferedCapabilities;
70 private Symbol[] _desiredCapabilities;
71 private Symbol[] _remoteDesiredCapabilities;
72
73 this(SessionImpl session, string name)
74 {
75 _session = session;
76 _session.incref();
77 _name = name;
78 ConnectionImpl conn = session.getConnectionImpl();
79 _node = conn.addLinkEndpoint(this);
80 conn.put(Type.LINK_INIT, this);
81 }
82
83
84 public string getName()
85 {
86 return _name;
87 }
88
89 public DeliveryImpl delivery(byte[] tag)
90 {
91 return delivery(tag, 0, cast(int)tag.length);
92 }
93
94 public DeliveryImpl delivery(byte[] tag, int offset, int length)
95 {
96 if (offset != 0 || length != tag.length)
97 {
98 throw new IllegalArgumentException("At present delivery tag must be the whole byte array");
99 }
100 incrementQueued();
101 try
102 {
103 DeliveryImpl delivery = new DeliveryImpl(tag, this, _tail);
104 if (_tail is null)
105 {
106 _head = delivery;
107 }
108 _tail = delivery;
109 if (_current is null)
110 {
111 _current = delivery;
112 }
113 getConnectionImpl().workUpdate(delivery);
114 return delivery;
115 }
116 catch (RuntimeException e)
117 {
118 //e.printStackTrace();
119 //throw e;
120 }
121 return null;
122 }
123
124 override
125 void postFinal() {
126 _session.getConnectionImpl().put(Type.LINK_FINAL, this);
127 _session.decref();
128 }
129
130 override
131 void doFree()
132 {
133 DeliveryImpl dlv = _head;
134 while (dlv !is null) {
135 DeliveryImpl next = dlv.next();
136 dlv.free();
137 dlv = next;
138 }
139
140 _session.getConnectionImpl().removeLinkEndpoint(_node);
141 _node = null;
142 }
143
144 void modifyEndpoints() {
145 modified();
146 }
147
148 /*
149 * Called when settling a message to ensure that the head/tail refs of the link are updated.
150 * The caller ensures the delivery updates its own refs appropriately.
151 */
152 void remove(DeliveryImpl delivery)
153 {
154 if(_head == delivery)
155 {
156 _head = delivery.getLinkNext();
157 }
158 if(_tail == delivery)
159 {
160 _tail = delivery.getLinkPrevious();
161 }
162 }
163
164 public DeliveryImpl current()
165 {
166 return _current;
167 }
168
169 public bool advance()
170 {
171 if(_current !is null )
172 {
173 DeliveryImpl oldCurrent = _current;
174 _current = _current.getLinkNext();
175 getConnectionImpl().workUpdate(oldCurrent);
176
177 if(_current !is null)
178 {
179 getConnectionImpl().workUpdate(_current);
180 }
181 return true;
182 }
183 else
184 {
185 return false;
186 }
187
188 }
189
190 override
191 public ConnectionImpl getConnectionImpl()
192 {
193 return _session.getConnectionImpl();
194 }
195
196 public SessionImpl getSession()
197 {
198 return _session;
199 }
200
201 public Source getRemoteSource()
202 {
203 return _remoteSource;
204 }
205
206 void setRemoteSource(Source source)
207 {
208 _remoteSource = source;
209 }
210
211 public Target getRemoteTarget()
212 {
213 return _remoteTarget;
214 }
215
216 void setRemoteTarget(Target target)
217 {
218 _remoteTarget = target;
219 }
220
221 public Source getSource()
222 {
223 return _source;
224 }
225
226 public void setSource(Source source)
227 {
228 // TODO - should be an error if local state is ACTIVE
229 _source = source;
230 }
231
232 public Target getTarget()
233 {
234 return _target;
235 }
236
237 public void setTarget(Target target)
238 {
239 // TODO - should be an error if local state is ACTIVE
240 _target = target;
241 }
242
243 public Link next(Set!EndpointState local, Set!EndpointState remote)
244 {
245 Query!LinkImpl query = new EndpointImplQuery!LinkImpl(local, remote);
246
247 LinkNode!LinkImpl linkNode = _node.next(query);
248
249 return linkNode is null ? null : linkNode.getValue();
250
251 }
252
253 abstract TransportLink getTransportLink();
254
255 public int getCredit()
256 {
257 return _credit;
258 }
259
260 public void addCredit(int credit)
261 {
262 _credit+=credit;
263 }
264
265 public void setCredit(int credit)
266 {
267 _credit = credit;
268 }
269
270 bool hasCredit()
271 {
272 return _credit > 0;
273 }
274
275 void incrementCredit()
276 {
277 _credit++;
278 }
279
280 void decrementCredit()
281 {
282 _credit--;
283 }
284
285 public int getQueued()
286 {
287 return _queued;
288 }
289
290 void incrementQueued()
291 {
292 _queued++;
293 }
294
295 void decrementQueued()
296 {
297 _queued--;
298 }
299
300 public int getUnsettled()
301 {
302 return _unsettled;
303 }
304
305 void incrementUnsettled()
306 {
307 _unsettled++;
308 }
309
310 void decrementUnsettled()
311 {
312 _unsettled--;
313 }
314
315 void setDrain(bool drain)
316 {
317 _drain = drain;
318 }
319
320 override
321 public bool getDrain()
322 {
323 return _drain;
324 }
325
326 override
327 public SenderSettleMode getSenderSettleMode()
328 {
329 return _senderSettleMode;
330 }
331
332 override
333 public void setSenderSettleMode(SenderSettleMode senderSettleMode)
334 {
335 _senderSettleMode = senderSettleMode;
336 }
337
338 override
339 public SenderSettleMode getRemoteSenderSettleMode()
340 {
341 return _remoteSenderSettleMode;
342 }
343
344 override
345 public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode)
346 {
347 _remoteSenderSettleMode = remoteSenderSettleMode;
348 }
349
350 override
351 public ReceiverSettleMode getReceiverSettleMode()
352 {
353 return _receiverSettleMode;
354 }
355
356 override
357 public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode)
358 {
359 _receiverSettleMode = receiverSettleMode;
360 }
361
362 override
363 public ReceiverSettleMode getRemoteReceiverSettleMode()
364 {
365 return _remoteReceiverSettleMode;
366 }
367
368 void setRemoteReceiverSettleMode(ReceiverSettleMode remoteReceiverSettleMode)
369 {
370 _remoteReceiverSettleMode = remoteReceiverSettleMode;
371 }
372
373 override
374 public Map!(Symbol, Object) getProperties()
375 {
376 return _properties;
377 }
378
379 override
380 public void setProperties(Map!(Symbol, Object) properties)
381 {
382 _properties = properties;
383 }
384
385 override
386 public Map!(Symbol, Object) getRemoteProperties()
387 {
388 return _remoteProperties;
389 }
390
391 void setRemoteProperties(Map!(Symbol, Object) remoteProperties)
392 {
393 _remoteProperties = remoteProperties;
394 }
395
396 override
397 public Symbol[] getDesiredCapabilities()
398 {
399 return _desiredCapabilities;
400 }
401
402 override
403 public void setDesiredCapabilities(Symbol[] desiredCapabilities)
404 {
405 _desiredCapabilities = desiredCapabilities;
406 }
407
408 override
409 public Symbol[] getRemoteDesiredCapabilities()
410 {
411 return _remoteDesiredCapabilities;
412 }
413
414 void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
415 {
416 _remoteDesiredCapabilities = remoteDesiredCapabilities;
417 }
418
419 override
420 public Symbol[] getOfferedCapabilities()
421 {
422 return _offeredCapabilities;
423 }
424
425 override
426 public void setOfferedCapabilities(Symbol[] offeredCapabilities)
427 {
428 _offeredCapabilities = offeredCapabilities;
429 }
430
431 override
432 public Symbol[] getRemoteOfferedCapabilities()
433 {
434 return _remoteOfferedCapabilities;
435 }
436
437 void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
438 {
439 _remoteOfferedCapabilities = remoteOfferedCapabilities;
440 }
441
442 override
443 public UnsignedLong getMaxMessageSize()
444 {
445 return _maxMessageSize;
446 }
447
448 override
449 public void setMaxMessageSize(UnsignedLong maxMessageSize)
450 {
451 _maxMessageSize = maxMessageSize;
452 }
453
454 override
455 public UnsignedLong getRemoteMaxMessageSize()
456 {
457 return _remoteMaxMessageSize;
458 }
459
460 void setRemoteMaxMessageSize(UnsignedLong remoteMaxMessageSize)
461 {
462 _remoteMaxMessageSize = remoteMaxMessageSize;
463 }
464
465 override
466 public int drained()
467 {
468 int drained = 0;
469
470 if (cast(SenderImpl)this !is null) {
471 if(getDrain() && hasCredit())
472 {
473 _drained = getCredit();
474 setCredit(0);
475 modified();
476 drained = _drained;
477 }
478 } else {
479 drained = _drained;
480 _drained = 0;
481 }
482
483 return drained;
484 }
485
486 int getDrained()
487 {
488 return _drained;
489 }
490
491 void setDrained(int value)
492 {
493 _drained = value;
494 }
495
496 override
497 public DeliveryImpl head()
498 {
499 return _head;
500 }
501
502 override
503 void localOpen()
504 {
505 getConnectionImpl().put(Type.LINK_LOCAL_OPEN, this);
506 }
507
508 override
509 void localClose()
510 {
511 getConnectionImpl().put(Type.LINK_LOCAL_CLOSE, this);
512 }
513
514 override
515 public void detach()
516 {
517 _detached = true;
518 getConnectionImpl().put(Type.LINK_LOCAL_DETACH, this);
519 modified();
520 }
521
522 public bool detached()
523 {
524 return _detached;
525 }
526 }