1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.LinkImpl; 13 14 import hunt.collection.Set; 15 import hunt.collection.Map; 16 17 import hunt.proton.amqp.Symbol; 18 import hunt.proton.amqp.UnsignedLong; 19 import hunt.proton.amqp.transport.ReceiverSettleMode; 20 import hunt.proton.amqp.transport.SenderSettleMode; 21 import hunt.proton.amqp.transport.Source; 22 import hunt.proton.amqp.transport.Target; 23 import hunt.proton.engine.EndpointState; 24 import hunt.proton.engine.Event; 25 import hunt.proton.engine.Link; 26 import hunt.proton.engine.impl.EndpointImpl; 27 import hunt.proton.engine.impl.DeliveryImpl; 28 import hunt.proton.engine.impl.LinkNode; 29 import hunt.Exceptions; 30 import hunt.proton.engine.impl.SessionImpl; 31 import hunt.proton.engine.impl.ConnectionImpl; 32 import hunt.proton.engine.impl.TransportLink; 33 import hunt.collection.Set; 34 import hunt.proton.engine.impl.EndpointImplQuery; 35 import hunt.proton.engine.impl.SenderImpl; 36 37 class LinkImpl : EndpointImpl , Link 38 { 39 40 private SessionImpl _session; 41 42 DeliveryImpl _head; 43 DeliveryImpl _tail; 44 DeliveryImpl _current; 45 private string _name; 46 private Source _source; 47 private Source _remoteSource; 48 private Target _target; 49 private Target _remoteTarget; 50 private int _queued; 51 private int _credit; 52 private int _unsettled; 53 private int _drained; 54 private UnsignedLong _maxMessageSize; 55 private UnsignedLong _remoteMaxMessageSize; 56 57 private SenderSettleMode _senderSettleMode; 58 private SenderSettleMode _remoteSenderSettleMode; 59 private ReceiverSettleMode _receiverSettleMode; 60 private ReceiverSettleMode _remoteReceiverSettleMode; 61 62 63 private LinkNode!(LinkImpl) _node; 64 private bool _drain; 65 private bool _detached; 66 private Map!(Symbol, Object) _properties; 67 private Map!(Symbol, Object) _remoteProperties; 68 private Symbol[] _offeredCapabilities; 69 private Symbol[] _remoteOfferedCapabilities; 70 private Symbol[] _desiredCapabilities; 71 private Symbol[] _remoteDesiredCapabilities; 72 73 this(SessionImpl session, string name) 74 { 75 _session = session; 76 _session.incref(); 77 _name = name; 78 ConnectionImpl conn = session.getConnectionImpl(); 79 _node = conn.addLinkEndpoint(this); 80 conn.put(Type.LINK_INIT, this); 81 } 82 83 84 public string getName() 85 { 86 return _name; 87 } 88 89 public DeliveryImpl delivery(byte[] tag) 90 { 91 return delivery(tag, 0, cast(int)tag.length); 92 } 93 94 public DeliveryImpl delivery(byte[] tag, int offset, int length) 95 { 96 if (offset != 0 || length != tag.length) 97 { 98 throw new IllegalArgumentException("At present delivery tag must be the whole byte array"); 99 } 100 incrementQueued(); 101 try 102 { 103 DeliveryImpl delivery = new DeliveryImpl(tag, this, _tail); 104 if (_tail is null) 105 { 106 _head = delivery; 107 } 108 _tail = delivery; 109 if (_current is null) 110 { 111 _current = delivery; 112 } 113 getConnectionImpl().workUpdate(delivery); 114 return delivery; 115 } 116 catch (RuntimeException e) 117 { 118 //e.printStackTrace(); 119 //throw e; 120 } 121 return null; 122 } 123 124 override 125 void postFinal() { 126 _session.getConnectionImpl().put(Type.LINK_FINAL, this); 127 _session.decref(); 128 } 129 130 override 131 void doFree() 132 { 133 DeliveryImpl dlv = _head; 134 while (dlv !is null) { 135 DeliveryImpl next = dlv.next(); 136 dlv.free(); 137 dlv = next; 138 } 139 140 _session.getConnectionImpl().removeLinkEndpoint(_node); 141 _node = null; 142 } 143 144 void modifyEndpoints() { 145 modified(); 146 } 147 148 /* 149 * Called when settling a message to ensure that the head/tail refs of the link are updated. 150 * The caller ensures the delivery updates its own refs appropriately. 151 */ 152 void remove(DeliveryImpl delivery) 153 { 154 if(_head == delivery) 155 { 156 _head = delivery.getLinkNext(); 157 } 158 if(_tail == delivery) 159 { 160 _tail = delivery.getLinkPrevious(); 161 } 162 } 163 164 public DeliveryImpl current() 165 { 166 return _current; 167 } 168 169 public bool advance() 170 { 171 if(_current !is null ) 172 { 173 DeliveryImpl oldCurrent = _current; 174 _current = _current.getLinkNext(); 175 getConnectionImpl().workUpdate(oldCurrent); 176 177 if(_current !is null) 178 { 179 getConnectionImpl().workUpdate(_current); 180 } 181 return true; 182 } 183 else 184 { 185 return false; 186 } 187 188 } 189 190 override 191 public ConnectionImpl getConnectionImpl() 192 { 193 return _session.getConnectionImpl(); 194 } 195 196 public SessionImpl getSession() 197 { 198 return _session; 199 } 200 201 public Source getRemoteSource() 202 { 203 return _remoteSource; 204 } 205 206 void setRemoteSource(Source source) 207 { 208 _remoteSource = source; 209 } 210 211 public Target getRemoteTarget() 212 { 213 return _remoteTarget; 214 } 215 216 void setRemoteTarget(Target target) 217 { 218 _remoteTarget = target; 219 } 220 221 public Source getSource() 222 { 223 return _source; 224 } 225 226 public void setSource(Source source) 227 { 228 // TODO - should be an error if local state is ACTIVE 229 _source = source; 230 } 231 232 public Target getTarget() 233 { 234 return _target; 235 } 236 237 public void setTarget(Target target) 238 { 239 // TODO - should be an error if local state is ACTIVE 240 _target = target; 241 } 242 243 public Link next(Set!EndpointState local, Set!EndpointState remote) 244 { 245 Query!LinkImpl query = new EndpointImplQuery!LinkImpl(local, remote); 246 247 LinkNode!LinkImpl linkNode = _node.next(query); 248 249 return linkNode is null ? null : linkNode.getValue(); 250 251 } 252 253 abstract TransportLink getTransportLink(); 254 255 public int getCredit() 256 { 257 return _credit; 258 } 259 260 public void addCredit(int credit) 261 { 262 _credit+=credit; 263 } 264 265 public void setCredit(int credit) 266 { 267 _credit = credit; 268 } 269 270 bool hasCredit() 271 { 272 return _credit > 0; 273 } 274 275 void incrementCredit() 276 { 277 _credit++; 278 } 279 280 void decrementCredit() 281 { 282 _credit--; 283 } 284 285 public int getQueued() 286 { 287 return _queued; 288 } 289 290 void incrementQueued() 291 { 292 _queued++; 293 } 294 295 void decrementQueued() 296 { 297 _queued--; 298 } 299 300 public int getUnsettled() 301 { 302 return _unsettled; 303 } 304 305 void incrementUnsettled() 306 { 307 _unsettled++; 308 } 309 310 void decrementUnsettled() 311 { 312 _unsettled--; 313 } 314 315 void setDrain(bool drain) 316 { 317 _drain = drain; 318 } 319 320 override 321 public bool getDrain() 322 { 323 return _drain; 324 } 325 326 override 327 public SenderSettleMode getSenderSettleMode() 328 { 329 return _senderSettleMode; 330 } 331 332 override 333 public void setSenderSettleMode(SenderSettleMode senderSettleMode) 334 { 335 _senderSettleMode = senderSettleMode; 336 } 337 338 override 339 public SenderSettleMode getRemoteSenderSettleMode() 340 { 341 return _remoteSenderSettleMode; 342 } 343 344 override 345 public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode) 346 { 347 _remoteSenderSettleMode = remoteSenderSettleMode; 348 } 349 350 override 351 public ReceiverSettleMode getReceiverSettleMode() 352 { 353 return _receiverSettleMode; 354 } 355 356 override 357 public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode) 358 { 359 _receiverSettleMode = receiverSettleMode; 360 } 361 362 override 363 public ReceiverSettleMode getRemoteReceiverSettleMode() 364 { 365 return _remoteReceiverSettleMode; 366 } 367 368 void setRemoteReceiverSettleMode(ReceiverSettleMode remoteReceiverSettleMode) 369 { 370 _remoteReceiverSettleMode = remoteReceiverSettleMode; 371 } 372 373 override 374 public Map!(Symbol, Object) getProperties() 375 { 376 return _properties; 377 } 378 379 override 380 public void setProperties(Map!(Symbol, Object) properties) 381 { 382 _properties = properties; 383 } 384 385 override 386 public Map!(Symbol, Object) getRemoteProperties() 387 { 388 return _remoteProperties; 389 } 390 391 void setRemoteProperties(Map!(Symbol, Object) remoteProperties) 392 { 393 _remoteProperties = remoteProperties; 394 } 395 396 override 397 public Symbol[] getDesiredCapabilities() 398 { 399 return _desiredCapabilities; 400 } 401 402 override 403 public void setDesiredCapabilities(Symbol[] desiredCapabilities) 404 { 405 _desiredCapabilities = desiredCapabilities; 406 } 407 408 override 409 public Symbol[] getRemoteDesiredCapabilities() 410 { 411 return _remoteDesiredCapabilities; 412 } 413 414 void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) 415 { 416 _remoteDesiredCapabilities = remoteDesiredCapabilities; 417 } 418 419 override 420 public Symbol[] getOfferedCapabilities() 421 { 422 return _offeredCapabilities; 423 } 424 425 override 426 public void setOfferedCapabilities(Symbol[] offeredCapabilities) 427 { 428 _offeredCapabilities = offeredCapabilities; 429 } 430 431 override 432 public Symbol[] getRemoteOfferedCapabilities() 433 { 434 return _remoteOfferedCapabilities; 435 } 436 437 void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) 438 { 439 _remoteOfferedCapabilities = remoteOfferedCapabilities; 440 } 441 442 override 443 public UnsignedLong getMaxMessageSize() 444 { 445 return _maxMessageSize; 446 } 447 448 override 449 public void setMaxMessageSize(UnsignedLong maxMessageSize) 450 { 451 _maxMessageSize = maxMessageSize; 452 } 453 454 override 455 public UnsignedLong getRemoteMaxMessageSize() 456 { 457 return _remoteMaxMessageSize; 458 } 459 460 void setRemoteMaxMessageSize(UnsignedLong remoteMaxMessageSize) 461 { 462 _remoteMaxMessageSize = remoteMaxMessageSize; 463 } 464 465 override 466 public int drained() 467 { 468 int drained = 0; 469 470 if (cast(SenderImpl)this !is null) { 471 if(getDrain() && hasCredit()) 472 { 473 _drained = getCredit(); 474 setCredit(0); 475 modified(); 476 drained = _drained; 477 } 478 } else { 479 drained = _drained; 480 _drained = 0; 481 } 482 483 return drained; 484 } 485 486 int getDrained() 487 { 488 return _drained; 489 } 490 491 void setDrained(int value) 492 { 493 _drained = value; 494 } 495 496 override 497 public DeliveryImpl head() 498 { 499 return _head; 500 } 501 502 override 503 void localOpen() 504 { 505 getConnectionImpl().put(Type.LINK_LOCAL_OPEN, this); 506 } 507 508 override 509 void localClose() 510 { 511 getConnectionImpl().put(Type.LINK_LOCAL_CLOSE, this); 512 } 513 514 override 515 public void detach() 516 { 517 _detached = true; 518 getConnectionImpl().put(Type.LINK_LOCAL_DETACH, this); 519 modified(); 520 } 521 522 public bool detached() 523 { 524 return _detached; 525 } 526 }