1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.TransportSession; 13 14 import hunt.collection.HashMap; 15 import hunt.collection.Map; 16 17 import hunt.proton.amqp.Binary; 18 import hunt.proton.amqp.UnsignedInteger; 19 import hunt.proton.amqp.transport.Disposition; 20 import hunt.proton.amqp.transport.Flow; 21 import hunt.proton.amqp.transport.Role; 22 import hunt.proton.amqp.transport.Transfer; 23 import hunt.proton.engine.Event; 24 import std.concurrency : initOnce; 25 import hunt.proton.engine.impl.TransportImpl; 26 import hunt.proton.engine.impl.SessionImpl; 27 import hunt.proton.engine.impl.TransportLink; 28 import hunt.proton.engine.impl.DeliveryImpl; 29 import hunt.proton.engine.impl.TransportReceiver; 30 import hunt.Exceptions; 31 import hunt.proton.engine.impl.ReceiverImpl; 32 import hunt.proton.engine.impl.TransportDelivery; 33 import hunt.Boolean; 34 35 class TransportSession 36 { 37 private static int HANDLE_MAX = 65535; 38 //private static UnsignedInteger DEFAULT_WINDOW_SIZE = UnsignedInteger.valueOf(2147483647); // biggest legal value 39 40 static UnsignedInteger DEFAULT_WINDOW_SIZE() { 41 __gshared UnsignedInteger inst; 42 return initOnce!inst(UnsignedInteger.valueOf(2147483647)); 43 } 44 45 //static UnsignedInteger _handleMax() { 46 // __gshared UnsignedInteger inst; 47 // return initOnce!inst(UnsignedInteger.valueOf(HANDLE_MAX)); 48 //} 49 // 50 // static UnsignedInteger _outgoingDeliveryId() { 51 // __gshared UnsignedInteger inst; 52 // return initOnce!inst(UnsignedInteger.ZERO); 53 // } 54 // 55 //static UnsignedInteger _incomingWindowSize() { 56 // __gshared UnsignedInteger inst; 57 // return initOnce!inst(UnsignedInteger.ZERO); 58 //} 59 // 60 // static UnsignedInteger _outgoingWindowSize() { 61 // __gshared UnsignedInteger inst; 62 // return initOnce!inst(UnsignedInteger.ZERO); 63 // } 64 // 65 //static UnsignedInteger _nextOutgoingId() { 66 // __gshared UnsignedInteger inst; 67 // return initOnce!inst(UnsignedInteger.ONE); 68 //} 69 70 private TransportImpl _transport; 71 private SessionImpl _session; 72 private int _localChannel = -1; 73 private int _remoteChannel = -1; 74 private bool _openSent; 75 private UnsignedInteger _handleMax ;//= UnsignedInteger.valueOf(HANDLE_MAX); //TODO: should this be configurable? 76 // This is used for the delivery-id actually stamped in each transfer frame of a given message delivery. 77 private UnsignedInteger _outgoingDeliveryId ;//= UnsignedInteger.ZERO; 78 //These are used for the session windows communicated via Begin/Flow frames 79 //and the conceptual transfer-id relating to updating them. 80 private UnsignedInteger _incomingWindowSize ;//= UnsignedInteger.ZERO; 81 private UnsignedInteger _outgoingWindowSize ;//= UnsignedInteger.ZERO; 82 private UnsignedInteger _nextOutgoingId ;//UnsignedInteger.ONE; 83 private UnsignedInteger _nextIncomingId = null; 84 85 private Map!(UnsignedInteger, TransportLink) _remoteHandlesMap ;// = new HashMap<UnsignedInteger, TransportLink<?>>(); 86 private Map!(UnsignedInteger, TransportLink) _localHandlesMap ;// = new HashMap<UnsignedInteger, TransportLink<?>>(); 87 private Map!(string, TransportLink) _halfOpenLinks ;//= new HashMap<String, TransportLink>(); 88 89 90 private UnsignedInteger _incomingDeliveryId = null; 91 private UnsignedInteger _remoteIncomingWindow; 92 private UnsignedInteger _remoteOutgoingWindow; 93 private UnsignedInteger _remoteNextIncomingId ;//= _nextOutgoingId; 94 private UnsignedInteger _remoteNextOutgoingId; 95 private Map!(UnsignedInteger, DeliveryImpl) _unsettledIncomingDeliveriesById ;//= new HashMap<UnsignedInteger, DeliveryImpl>(); 96 private Map!(UnsignedInteger, DeliveryImpl) _unsettledOutgoingDeliveriesById ;// = new HashMap<UnsignedInteger, DeliveryImpl>(); 97 private int _unsettledIncomingSize; 98 private bool _endReceived; 99 private bool _beginSent; 100 101 102 this(TransportImpl transport, SessionImpl session) 103 { 104 _transport = transport; 105 _session = session; 106 _outgoingWindowSize = UnsignedInteger.valueOf(session.getOutgoingWindow()); 107 108 _handleMax = UnsignedInteger.valueOf(HANDLE_MAX); 109 _outgoingDeliveryId = UnsignedInteger.ZERO; 110 _incomingWindowSize = UnsignedInteger.ZERO; 111 // 112 // _outgoingWindowSize = UnsignedInteger.ZERO; 113 _nextOutgoingId = UnsignedInteger.ONE; 114 115 _remoteNextIncomingId = _nextOutgoingId; 116 _remoteHandlesMap = new HashMap!(UnsignedInteger, TransportLink)(); 117 _localHandlesMap = new HashMap!(UnsignedInteger, TransportLink)(); 118 _halfOpenLinks = new HashMap!(string, TransportLink)(); 119 _unsettledIncomingDeliveriesById = new HashMap!(UnsignedInteger, DeliveryImpl)(); 120 _unsettledOutgoingDeliveriesById = new HashMap!(UnsignedInteger, DeliveryImpl)(); 121 } 122 123 void unbind() 124 { 125 unsetLocalChannel(); 126 unsetRemoteChannel(); 127 } 128 129 public SessionImpl getSession() 130 { 131 return _session; 132 } 133 134 public int getLocalChannel() 135 { 136 return _localChannel; 137 } 138 139 public void setLocalChannel(int localChannel) 140 { 141 if (!isLocalChannelSet()) { 142 _session.incref(); 143 } 144 _localChannel = localChannel; 145 } 146 147 public int getRemoteChannel() 148 { 149 return _remoteChannel; 150 } 151 152 public void setRemoteChannel(int remoteChannel) 153 { 154 if (!isRemoteChannelSet()) { 155 _session.incref(); 156 } 157 _remoteChannel = remoteChannel; 158 } 159 160 public bool isOpenSent() 161 { 162 return _openSent; 163 } 164 165 public void setOpenSent(bool openSent) 166 { 167 _openSent = openSent; 168 } 169 170 public bool isRemoteChannelSet() 171 { 172 return _remoteChannel != -1; 173 } 174 175 public bool isLocalChannelSet() 176 { 177 return _localChannel != -1; 178 } 179 180 public void unsetLocalChannel() 181 { 182 if (isLocalChannelSet()) { 183 unsetLocalHandles(); 184 _session.decref(); 185 } 186 _localChannel = -1; 187 } 188 189 private void unsetLocalHandles() 190 { 191 foreach (TransportLink tl ; _localHandlesMap.values()) 192 { 193 tl.clearLocalHandle(); 194 } 195 _localHandlesMap.clear(); 196 } 197 198 public void unsetRemoteChannel() 199 { 200 if (isRemoteChannelSet()) { 201 unsetRemoteHandles(); 202 _session.decref(); 203 } 204 _remoteChannel = -1; 205 } 206 207 private void unsetRemoteHandles() 208 { 209 foreach (TransportLink tl ; _remoteHandlesMap.values()) 210 { 211 tl.clearRemoteHandle(); 212 } 213 _remoteHandlesMap.clear(); 214 } 215 216 public UnsignedInteger getHandleMax() 217 { 218 return _handleMax; 219 } 220 221 public UnsignedInteger getIncomingWindowSize() 222 { 223 return _incomingWindowSize; 224 } 225 226 void updateIncomingWindow() 227 { 228 int incomingCapacity = _session.getIncomingCapacity(); 229 int size = _transport.getMaxFrameSize(); 230 if (incomingCapacity <= 0 || size <= 0) { 231 _incomingWindowSize = DEFAULT_WINDOW_SIZE; 232 } else { 233 _incomingWindowSize = UnsignedInteger.valueOf((incomingCapacity - _session.getIncomingBytes())/size); 234 } 235 } 236 237 public UnsignedInteger getOutgoingDeliveryId() 238 { 239 return _outgoingDeliveryId; 240 } 241 242 void incrementOutgoingDeliveryId() 243 { 244 _outgoingDeliveryId = _outgoingDeliveryId.add(UnsignedInteger.ONE); 245 } 246 247 public UnsignedInteger getOutgoingWindowSize() 248 { 249 return _outgoingWindowSize; 250 } 251 252 public UnsignedInteger getNextOutgoingId() 253 { 254 return _nextOutgoingId; 255 } 256 257 public TransportLink getLinkFromRemoteHandle(UnsignedInteger handle) 258 { 259 return _remoteHandlesMap.get(handle); 260 } 261 262 public UnsignedInteger allocateLocalHandle(TransportLink transportLink) 263 { 264 for(int i = 0; i <= HANDLE_MAX; i++) 265 { 266 UnsignedInteger handle = UnsignedInteger.valueOf(i); 267 if(!_localHandlesMap.containsKey(handle)) 268 { 269 _localHandlesMap.put(handle, transportLink); 270 transportLink.setLocalHandle(handle); 271 return handle; 272 } 273 } 274 throw new IllegalStateException("no local handle available for allocation"); 275 } 276 277 public void addLinkRemoteHandle(TransportLink link, UnsignedInteger remoteHandle) 278 { 279 _remoteHandlesMap.put(remoteHandle, link); 280 } 281 282 public void addLinkLocalHandle(TransportLink link, UnsignedInteger localhandle) 283 { 284 _localHandlesMap.put(localhandle, link); 285 } 286 287 public void freeLocalHandle(UnsignedInteger handle) 288 { 289 _localHandlesMap.remove(handle); 290 } 291 292 public void freeRemoteHandle(UnsignedInteger handle) 293 { 294 _remoteHandlesMap.remove(handle); 295 } 296 297 public TransportLink resolveHalfOpenLink(string name) 298 { 299 return _halfOpenLinks.remove(name); 300 } 301 302 public void addHalfOpenLink(TransportLink link) 303 { 304 _halfOpenLinks.put(link.getName(), link); 305 } 306 307 public void handleTransfer(Transfer transfer, Binary payload) 308 { 309 DeliveryImpl delivery; 310 incrementNextIncomingId(); // The conceptual/non-wire transfer-id, for the session window. 311 312 TransportReceiver transportReceiver = cast(TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle()); 313 UnsignedInteger linkIncomingDeliveryId = transportReceiver.getIncomingDeliveryId(); 314 UnsignedInteger deliveryId = transfer.getDeliveryId(); 315 316 if(linkIncomingDeliveryId !is null && (linkIncomingDeliveryId == (deliveryId) || deliveryId is null)) 317 { 318 delivery = _unsettledIncomingDeliveriesById.get(linkIncomingDeliveryId); 319 delivery.getTransportDelivery().incrementSessionSize(); 320 } 321 else 322 { 323 verifyNewDeliveryIdSequence(_incomingDeliveryId, linkIncomingDeliveryId, deliveryId); 324 325 _incomingDeliveryId = deliveryId; 326 327 ReceiverImpl receiver = transportReceiver.getReceiver(); 328 Binary deliveryTag = transfer.getDeliveryTag(); 329 delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(), 330 deliveryTag.getLength()); 331 UnsignedInteger messageFormat = transfer.getMessageFormat(); 332 if(messageFormat !is null) { 333 delivery.setMessageFormat(messageFormat.intValue()); 334 } 335 TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportReceiver); 336 delivery.setTransportDelivery(transportDelivery); 337 transportReceiver.setIncomingDeliveryId(deliveryId); 338 _unsettledIncomingDeliveriesById.put(deliveryId, delivery); 339 getSession().incrementIncomingDeliveries(1); 340 } 341 342 if( transfer.getState()!is null ) 343 { 344 delivery.setRemoteDeliveryState(transfer.getState()); 345 } 346 _unsettledIncomingSize++; 347 348 bool aborted = transfer.getAborted().booleanValue; 349 if (payload !is null && !aborted) 350 { 351 delivery.append(payload); 352 getSession().incrementIncomingBytes(payload.getLength()); 353 } 354 355 delivery.updateWork(); 356 357 if(!transfer.getMore().booleanValue || aborted) 358 { 359 transportReceiver.setIncomingDeliveryId(null); 360 if(aborted) { 361 delivery.setAborted(); 362 } else { 363 delivery.setComplete(); 364 } 365 366 delivery.getLink().getTransportLink().decrementLinkCredit(); 367 delivery.getLink().getTransportLink().incrementDeliveryCount(); 368 } 369 370 if(Boolean.TRUE == (transfer.getSettled()) || aborted) 371 { 372 delivery.setRemoteSettled(true); 373 } 374 375 _incomingWindowSize = _incomingWindowSize.subtract(UnsignedInteger.ONE); 376 377 // this will cause a flow to happen 378 if (_incomingWindowSize == (UnsignedInteger.ZERO)) { 379 delivery.getLink().modified(false); 380 } 381 382 getSession().getConnection().put(Type.DELIVERY, delivery); 383 } 384 385 private void verifyNewDeliveryIdSequence(UnsignedInteger previousId, UnsignedInteger linkIncomingId, UnsignedInteger newDeliveryId) { 386 if(newDeliveryId is null) { 387 throw new IllegalStateException("No delivery-id specified on first Transfer of new delivery"); 388 } 389 390 // Doing a primitive comparison, uses intValue() since its a uint sequence 391 // and we need the primitive values to wrap appropriately during comparison. 392 if(previousId !is null && previousId.intValue() + 1 != newDeliveryId.intValue()) { 393 throw new IllegalStateException("Expected delivery-id " ); 394 } 395 396 if(linkIncomingId !is null) { 397 throw new IllegalStateException("Illegal multiplex of deliveries on same link with delivery-id "); 398 } 399 } 400 401 public void freeLocalChannel() 402 { 403 unsetLocalChannel(); 404 } 405 406 public void freeRemoteChannel() 407 { 408 unsetRemoteChannel(); 409 } 410 411 private void setRemoteIncomingWindow(UnsignedInteger incomingWindow) 412 { 413 _remoteIncomingWindow = incomingWindow; 414 } 415 416 void decrementRemoteIncomingWindow() 417 { 418 _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE); 419 } 420 421 private void setRemoteOutgoingWindow(UnsignedInteger outgoingWindow) 422 { 423 _remoteOutgoingWindow = outgoingWindow; 424 } 425 426 void handleFlow(Flow flow) 427 { 428 UnsignedInteger inext = flow.getNextIncomingId(); 429 UnsignedInteger iwin = flow.getIncomingWindow(); 430 431 if(inext !is null) 432 { 433 setRemoteNextIncomingId(inext); 434 setRemoteIncomingWindow(inext.add(iwin).subtract(_nextOutgoingId)); 435 } 436 else 437 { 438 setRemoteIncomingWindow(iwin); 439 } 440 setRemoteNextOutgoingId(flow.getNextOutgoingId()); 441 setRemoteOutgoingWindow(flow.getOutgoingWindow()); 442 443 if(flow.getHandle() !is null) 444 { 445 TransportLink transportLink = getLinkFromRemoteHandle(flow.getHandle()); 446 transportLink.handleFlow(flow); 447 448 449 } 450 } 451 452 private void setRemoteNextOutgoingId(UnsignedInteger nextOutgoingId) 453 { 454 _remoteNextOutgoingId = nextOutgoingId; 455 } 456 457 private void setRemoteNextIncomingId(UnsignedInteger remoteNextIncomingId) 458 { 459 _remoteNextIncomingId = remoteNextIncomingId; 460 } 461 462 void handleDisposition(Disposition disposition) 463 { 464 UnsignedInteger id = disposition.getFirst(); 465 UnsignedInteger last = disposition.getLast() is null ? id : disposition.getLast(); 466 Map!(UnsignedInteger, DeliveryImpl) unsettledDeliveries = 467 disposition.getRole() == Role.RECEIVER ? _unsettledOutgoingDeliveriesById 468 : _unsettledIncomingDeliveriesById; 469 470 while(id <= (last)) 471 { 472 DeliveryImpl delivery = unsettledDeliveries.get(id); 473 if(delivery !is null) 474 { 475 if(disposition.getState() !is null) 476 { 477 delivery.setRemoteDeliveryState(disposition.getState()); 478 } 479 if(Boolean.TRUE == (disposition.getSettled())) 480 { 481 delivery.setRemoteSettled(true); 482 unsettledDeliveries.remove(id); 483 } 484 delivery.updateWork(); 485 486 getSession().getConnection().put(Type.DELIVERY, delivery); 487 } 488 id = id.add(UnsignedInteger.ONE); 489 } 490 //TODO - Implement. 491 } 492 493 void addUnsettledOutgoing(UnsignedInteger deliveryId, DeliveryImpl delivery) 494 { 495 _unsettledOutgoingDeliveriesById.put(deliveryId, delivery); 496 } 497 498 public bool hasOutgoingCredit() 499 { 500 return _remoteIncomingWindow is null ? false 501 : _remoteIncomingWindow > (UnsignedInteger.ZERO); 502 503 } 504 505 void incrementOutgoingId() 506 { 507 _nextOutgoingId = _nextOutgoingId.add(UnsignedInteger.ONE); 508 } 509 510 public void settled(TransportDelivery transportDelivery) 511 { 512 if( cast(ReceiverImpl) (transportDelivery.getTransportLink().getLink()) !is null ) 513 { 514 _unsettledIncomingDeliveriesById.remove(transportDelivery.getDeliveryId()); 515 getSession().modified(false); 516 } 517 else 518 { 519 _unsettledOutgoingDeliveriesById.remove(transportDelivery.getDeliveryId()); 520 getSession().modified(false); 521 } 522 } 523 524 public UnsignedInteger getNextIncomingId() 525 { 526 return _nextIncomingId; 527 } 528 529 public void setNextIncomingId(UnsignedInteger nextIncomingId) 530 { 531 _nextIncomingId = nextIncomingId; 532 } 533 534 public void incrementNextIncomingId() 535 { 536 _nextIncomingId = _nextIncomingId.add(UnsignedInteger.ONE); 537 } 538 539 public bool endReceived() 540 { 541 return _endReceived; 542 } 543 544 public void receivedEnd() 545 { 546 _endReceived = true; 547 } 548 549 public bool beginSent() 550 { 551 return _beginSent; 552 } 553 554 public void sentBegin() 555 { 556 _beginSent = true; 557 } 558 }