1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.TransportImpl; 13 14 import hunt.logging; 15 import std.conv:to; 16 import hunt.proton.engine.impl.ByteBufferUtils; 17 import hunt.Boolean; 18 import hunt.io.ByteBuffer; 19 import hunt.collection.ArrayList; 20 import hunt.collection.HashMap; 21 import hunt.collection.List; 22 import hunt.collection.Map; 23 import hunt.collection.LinkedHashMap; 24 25 import hunt.proton.amqp.Binary; 26 import hunt.proton.amqp.Symbol; 27 import hunt.proton.amqp.UnsignedInteger; 28 import hunt.proton.amqp.UnsignedShort; 29 import hunt.proton.amqp.security.SaslFrameBody; 30 import hunt.proton.amqp.transport.Attach; 31 import hunt.proton.amqp.transport.Begin; 32 import hunt.proton.amqp.transport.Close; 33 import hunt.proton.amqp.transport.ConnectionError; 34 import hunt.proton.amqp.transport.DeliveryState; 35 import hunt.proton.amqp.transport.Detach; 36 import hunt.proton.amqp.transport.Disposition; 37 import hunt.proton.amqp.transport.End; 38 import hunt.proton.amqp.transport.ErrorCondition; 39 import hunt.proton.amqp.transport.Flow; 40 import hunt.proton.amqp.transport.FrameBody; 41 import hunt.proton.amqp.transport.Open; 42 import hunt.proton.amqp.transport.Role; 43 import hunt.proton.amqp.transport.Transfer; 44 import hunt.proton.codec.AMQPDefinedTypes; 45 import hunt.proton.codec.DecoderImpl; 46 import hunt.proton.codec.EncoderImpl; 47 import hunt.proton.codec.ReadableBuffer; 48 import hunt.proton.engine.Connection; 49 import hunt.proton.engine.EndpointState; 50 import hunt.proton.engine.Event; 51 import hunt.proton.engine.ProtonJTransport; 52 import hunt.proton.engine.Sasl; 53 import hunt.proton.engine.Ssl; 54 import hunt.proton.engine.SslDomain; 55 import hunt.proton.engine.SslPeerDetails; 56 import hunt.proton.engine.TransportException; 57 import hunt.proton.engine.TransportResult; 58 import hunt.proton.engine.TransportResultFactory; 59 import hunt.proton.engine.impl.ssl.SslImpl; 60 import hunt.proton.framing.TransportFrame; 61 import hunt.proton.engine.Reactor; 62 import hunt.proton.engine.Selectable; 63 import hunt.proton.engine.impl.EndpointImpl; 64 import hunt.proton.engine.impl.FrameHandler; 65 import hunt.proton.engine.impl.TransportOutputWriter; 66 import hunt.proton.engine.impl.TransportInternal; 67 import hunt.Integer; 68 import hunt.proton.engine.impl.FrameParser; 69 import hunt.proton.engine.impl.ConnectionImpl; 70 import hunt.proton.engine.impl.TransportSession; 71 import hunt.proton.engine.impl.TransportInput; 72 import hunt.proton.engine.impl.TransportOutput; 73 import hunt.proton.engine.impl.FrameWriter; 74 import hunt.proton.engine.impl.SaslImpl; 75 import hunt.proton.engine.impl.Ref; 76 import hunt.proton.engine.impl.ProtocolTracer; 77 import hunt.proton.engine.Transport; 78 import hunt.proton.engine.impl.TransportLayer; 79 import hunt.proton.engine.impl.TransportWrapper; 80 import hunt.Exceptions; 81 import hunt.proton.engine.impl.LinkImpl; 82 import hunt.proton.engine.impl.TransportLink; 83 import hunt.proton.engine.impl.SessionImpl; 84 import hunt.proton.engine.impl.SenderImpl; 85 import hunt.proton.engine.impl.TransportSender; 86 import hunt.proton.engine.impl.DeliveryImpl; 87 import hunt.proton.engine.impl.TransportDelivery; 88 import hunt.Boolean; 89 import hunt.util.Common; 90 import hunt.proton.engine.impl.ReceiverImpl; 91 import hunt.proton.engine.impl.AmqpHeader; 92 import hunt.proton.engine.Event; 93 import hunt.proton.engine.impl.TransportOutputAdaptor; 94 import hunt.String; 95 import std.algorithm; 96 import hunt.proton.amqp.transport.Attach; 97 import hunt.proton.amqp.transport.Open; 98 import hunt.proton.amqp.transport.Begin; 99 import hunt.proton.amqp.transport.Detach; 100 import hunt.proton.amqp.transport.Close; 101 import hunt.proton.amqp.transport.Flow; 102 import hunt.proton.amqp.transport.End; 103 import hunt.proton.amqp.transport.Transfer; 104 import hunt.proton.amqp.transport.EmptyFrame; 105 import hunt.proton.amqp.transport.Disposition; 106 107 import hunt.Exceptions; 108 import hunt.util.Runnable; 109 110 class TransportImpl : EndpointImpl, ProtonJTransport, FrameBodyHandler!int, 111 FrameHandler, TransportOutputWriter, TransportInternal 112 { 113 // static int BUFFER_RELEASE_THRESHOLD = Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 1024); 114 private static int CHANNEL_MAX_LIMIT = 65535; 115 static int BUFFER_RELEASE_THRESHOLD = 2 * 1024 * 1024; 116 //private static boolean getBooleanEnv(String name) 117 //{ 118 // String value = System.getenv(name); 119 // return "true".equalsIgnoreCase(value) || 120 // "1".equals(value) || 121 // "yes".equalsIgnoreCase(value); 122 //} 123 124 private static bool FRM_ENABLED = false; 125 //private static boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM"); 126 private static int TRACE_FRAME_PAYLOAD_LENGTH = 1024; 127 // private static int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("proton.trace_frame_payload_length", 1024); 128 private static string HEADER_DESCRIPTION = "AMQP"; 129 130 // trace levels 131 private int _levels = 0; 132 133 private FrameParser _frameParser; 134 135 private ConnectionImpl _connectionEndpoint; 136 137 private bool _isOpenSent; 138 private bool _isCloseSent; 139 140 private bool _headerWritten; 141 private Map!(int, TransportSession) _remoteSessions ;//= new HashMap<Integer, TransportSession>(); 142 private Map!(int, TransportSession) _localSessions ; //= new HashMap<Integer, TransportSession>(); 143 144 private TransportInput _inputProcessor; 145 private TransportOutput _outputProcessor; 146 147 private DecoderImpl _decoder ;//= new DecoderImpl(); 148 private EncoderImpl _encoder ;//= new EncoderImpl(_decoder); 149 150 private int _maxFrameSize ;//= Transport.DEFAULT_MAX_FRAME_SIZE; 151 private int _remoteMaxFrameSize ;// = Transport.MIN_MAX_FRAME_SIZE; 152 private int _outboundFrameSizeLimit = 0; 153 private int _channelMax ;// = CHANNEL_MAX_LIMIT; 154 private int _remoteChannelMax ;// = CHANNEL_MAX_LIMIT; 155 156 private FrameWriter _frameWriter; 157 158 private bool _closeReceived; 159 private Open _open; 160 private SaslImpl _sasl; 161 //private SslImpl _ssl; 162 private Ref!ProtocolTracer _protocolTracer ;//= new Ref<>(null); 163 164 private TransportResult _lastTransportResult ;//= TransportResultFactory.ok(); 165 166 private bool _init; 167 private bool _processingStarted; 168 private bool _emitFlowEventOnSend = true; 169 private bool _useReadOnlyOutputBuffer = true; 170 171 private FrameHandler _frameHandler ;//= this; 172 private bool _head_closed = false; 173 private bool _conditionSet; 174 175 private bool postedHeadClosed = false; 176 private bool postedTailClosed = false; 177 private bool postedTransportError = false; 178 179 private int _localIdleTimeout = 0; 180 private int _remoteIdleTimeout = 0; 181 private long _bytesInput = 0; 182 private long _bytesOutput = 0; 183 private long _localIdleDeadline = 0; 184 private long _lastBytesInput = 0; 185 private long _lastBytesOutput = 0; 186 private long _remoteIdleDeadline = 0; 187 188 private Selectable _selectable; 189 private Reactor _reactor; 190 191 private List!TransportLayer _additionalTransportLayers; 192 193 // Cached instances used to carry the Performatives to the frame writer without the need to create 194 // a new instance on each operation that triggers a write 195 private Disposition cachedDisposition ;//= new Disposition(); 196 private Flow cachedFlow ;//= new Flow(); 197 private Transfer cachedTransfer ;//= new Transfer(); 198 199 /** 200 * Application code should use {@link hunt.proton.engine.Transport.Factory#create()} instead 201 */ 202 this() 203 { 204 this(Transport.DEFAULT_MAX_FRAME_SIZE); 205 } 206 207 /** 208 * Creates a transport with the given maximum frame size. 209 * Note that the maximumFrameSize also determines the size of the output buffer. 210 */ 211 this(int maxFrameSize) 212 { 213 _decoder = new DecoderImpl; 214 _encoder = new EncoderImpl(_decoder); 215 _remoteSessions = new LinkedHashMap!(int, TransportSession); 216 _localSessions = new LinkedHashMap!(int, TransportSession); 217 _protocolTracer = new Ref!ProtocolTracer(null); 218 _lastTransportResult = TransportResultFactory.ok(); 219 cachedTransfer = new Transfer(); 220 cachedFlow = new Flow(); 221 cachedDisposition = new Disposition(); 222 _remoteMaxFrameSize = Transport.MIN_MAX_FRAME_SIZE; 223 _maxFrameSize = Transport.DEFAULT_MAX_FRAME_SIZE; 224 _channelMax = CHANNEL_MAX_LIMIT; 225 _remoteChannelMax = CHANNEL_MAX_LIMIT; 226 AMQPDefinedTypes.registerAllTypes(_decoder, _encoder); 227 228 _maxFrameSize = maxFrameSize; 229 _frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize, 230 FrameWriter.AMQP_FRAME_TYPE, 231 this); 232 _frameHandler = this; 233 } 234 235 private void init() 236 { 237 if(!_init) 238 { 239 _init = true; 240 _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize, this); 241 _inputProcessor = _frameParser; 242 _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize, isUseReadOnlyOutputBuffer()); 243 } 244 } 245 246 public void trace(int levels) { 247 _levels = levels; 248 } 249 250 public int getMaxFrameSize() 251 { 252 return _maxFrameSize; 253 } 254 255 public int getRemoteMaxFrameSize() 256 { 257 return _remoteMaxFrameSize; 258 } 259 260 public void setInitialRemoteMaxFrameSize(int remoteMaxFrameSize) 261 { 262 if(_init) 263 { 264 throw new IllegalStateException("Cannot set initial remote max frame size after transport has been initialised"); 265 } 266 267 _remoteMaxFrameSize = remoteMaxFrameSize; 268 } 269 270 public void setMaxFrameSize(int maxFrameSize) 271 { 272 if(_init) 273 { 274 throw new IllegalStateException("Cannot set max frame size after transport has been initialised"); 275 } 276 _maxFrameSize = maxFrameSize; 277 } 278 279 public int getChannelMax() 280 { 281 return _channelMax; 282 } 283 284 public void setChannelMax(int channelMax) 285 { 286 if(_isOpenSent) 287 { 288 throw new IllegalArgumentException("Cannot change channel max after open frame has been sent"); 289 } 290 291 if(channelMax < 0 || channelMax >= (1<<16)) 292 { 293 throw new NumberFormatException("Value \"" ~ to!string(channelMax) ~"\" lies outside the range [0-" ~ to!string((1<<16) )~")."); 294 } 295 296 _channelMax = channelMax; 297 } 298 299 override 300 public int getRemoteChannelMax() 301 { 302 return _remoteChannelMax; 303 } 304 305 override 306 public ErrorCondition getCondition() 307 { 308 // Get the ErrorCondition, but only return it if its condition field is populated. 309 // This somewhat retains prior TransportImpl behaviour of returning null when no 310 // condition had been set (by TransportImpl itself) rather than the 'empty' ErrorCondition 311 // object historically used in the other areas. 312 ErrorCondition errorCondition = super.getCondition(); 313 return errorCondition.getCondition() !is null ? errorCondition : null; 314 } 315 316 override 317 public void setCondition(ErrorCondition error) 318 { 319 super.setCondition(error); 320 _conditionSet = error !is null && error.getCondition() !is null; 321 } 322 323 override 324 public void bind(Connection conn) 325 { 326 // TODO - check if already bound 327 328 _connectionEndpoint = cast(ConnectionImpl) conn; 329 put(Type.CONNECTION_BOUND, cast(Object)conn); 330 _connectionEndpoint.setTransport(this); 331 _connectionEndpoint.incref(); 332 333 if(getRemoteState() != EndpointState.UNINITIALIZED) 334 { 335 _connectionEndpoint.handleOpen(_open); 336 if(getRemoteState() == EndpointState.CLOSED) 337 { 338 _connectionEndpoint.setRemoteState(EndpointState.CLOSED); 339 } 340 341 _frameParser.flush(); 342 } 343 } 344 345 346 FrameBody copy() 347 { 348 implementationMissing(false); 349 return null; 350 } 351 352 override 353 public void unbind() 354 { 355 foreach (TransportSession ts ;_localSessions.values()) { 356 ts.unbind(); 357 } 358 foreach (TransportSession ts ;_remoteSessions.values()) { 359 ts.unbind(); 360 } 361 362 put(Type.CONNECTION_UNBOUND, _connectionEndpoint); 363 364 _connectionEndpoint.modifyEndpoints(); 365 _connectionEndpoint.setTransport(null); 366 _connectionEndpoint.decref(); 367 } 368 369 override 370 public int input(byte[] bytes, int offset, int length) 371 { 372 oldApiCheckStateBeforeInput(length).checkIsOk(); 373 374 ByteBuffer inputBuffer = getInputBuffer(); 375 int numberOfBytesConsumed = ByteBufferUtils.pourArrayToBuffer(bytes, offset, length, inputBuffer); 376 processInput().checkIsOk(); 377 return numberOfBytesConsumed; 378 } 379 380 /** 381 * This method is public as it is used by Python layer. 382 * @see hunt.proton.engine.Transport#input(byte[], int, int) 383 */ 384 public TransportResult oldApiCheckStateBeforeInput(int inputLength) 385 { 386 _lastTransportResult.checkIsOk(); 387 if(inputLength == 0) 388 { 389 if(_connectionEndpoint is null || _connectionEndpoint.getRemoteState() != EndpointState.CLOSED) 390 { 391 return TransportResultFactory.error(new TransportException("Unexpected EOS when remote connection not closed: connection aborted")); 392 } 393 } 394 return TransportResultFactory.ok(); 395 } 396 397 //================================================================================================================== 398 // Process model state to generate output 399 400 override 401 public int output(byte[] bytes, int offset, int size) 402 { 403 ByteBuffer outputBuffer = getOutputBuffer(); 404 int numberOfBytesOutput = ByteBufferUtils.pourBufferToArray(outputBuffer, bytes, offset, size); 405 outputConsumed(); 406 return numberOfBytesOutput; 407 } 408 409 override 410 public bool writeInto(ByteBuffer outputBuffer) 411 { 412 processHeader(); 413 processOpen(); 414 processBegin(); 415 processAttach(); 416 processReceiverFlow(); 417 // we process transport work twice intentionally, the first 418 // pass may end up settling deliveries that the second pass 419 // can clean up 420 processTransportWork(); 421 processTransportWork(); 422 processSenderFlow(); 423 processDetach(); 424 processEnd(); 425 processClose(); 426 427 _frameWriter.readBytes(outputBuffer); 428 429 return _isCloseSent || _head_closed; 430 } 431 432 override 433 public Sasl sasl() 434 { 435 if(_sasl is null) 436 { 437 if(_processingStarted) 438 { 439 throw new IllegalStateException("Sasl can't be initiated after transport has started processing"); 440 } 441 442 init(); 443 _sasl = new SaslImpl(this, _remoteMaxFrameSize); 444 TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, _outputProcessor); 445 _inputProcessor = transportWrapper; 446 _outputProcessor = transportWrapper; 447 } 448 return _sasl; 449 450 } 451 452 /** 453 * {@inheritDoc} 454 * 455 * <p>Note that sslDomain must implement {@link hunt.proton.engine.impl.ssl.ProtonSslEngineProvider}. 456 * This is not possible enforce at the API level because {@link hunt.proton.engine.impl.ssl.ProtonSslEngineProvider} is not part of the 457 * public Proton API.</p> 458 */ 459 override 460 public Ssl ssl(SslDomain sslDomain, SslPeerDetails sslPeerDetails) 461 { 462 implementationMissing(false); 463 return null; 464 //if (_ssl is null) 465 //{ 466 // init(); 467 // _ssl = new SslImpl(sslDomain, sslPeerDetails); 468 // TransportWrapper transportWrapper = _ssl.wrap(_inputProcessor, _outputProcessor); 469 // _inputProcessor = transportWrapper; 470 // _outputProcessor = transportWrapper; 471 //} 472 //return _ssl; 473 } 474 475 override 476 public Ssl ssl(SslDomain sslDomain) 477 { 478 return ssl(sslDomain, null); 479 } 480 481 private void processDetach() 482 { 483 // logInfo("processDetach out -------------------------------------"); 484 if(_connectionEndpoint !is null && _isOpenSent) 485 { 486 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 487 while(endpoint !is null) 488 { 489 LinkImpl link = cast(LinkImpl)endpoint; 490 if(link !is null) 491 { 492 TransportLink transportLink = getTransportState(link); 493 SessionImpl session = link.getSession(); 494 TransportSession transportSession = getTransportState(session); 495 // logInfo("processDetach in -------------------------------------"); 496 if(((link.getLocalState() == EndpointState.CLOSED) || link.detached()) 497 && transportLink.isLocalHandleSet() 498 && transportSession.isLocalChannelSet() 499 && !_isCloseSent) 500 { 501 if((cast(SenderImpl)link !is null) 502 && link.getQueued() > 0 503 && !transportLink.detachReceived() 504 && !transportSession.endReceived() 505 && !_closeReceived) { 506 endpoint = endpoint.transportNext(); 507 continue; 508 } 509 510 UnsignedInteger localHandle = transportLink.getLocalHandle(); 511 transportLink.clearLocalHandle(); 512 transportSession.freeLocalHandle(localHandle); 513 514 Detach detach = new Detach(); 515 detach.setHandle(localHandle); 516 detach.setClosed(new Boolean(!link.detached())); 517 518 ErrorCondition localError = link.getCondition(); 519 if( localError.getCondition() !is null ) 520 { 521 detach.setError(localError); 522 } 523 524 writeFrame(transportSession.getLocalChannel(), detach, null, null); 525 } 526 527 endpoint.clearModified(); 528 529 } 530 endpoint = endpoint.transportNext(); 531 } 532 } 533 } 534 535 private void writeFlow(TransportSession ssn, TransportLink link) 536 { 537 cachedFlow.setNextIncomingId(ssn.getNextIncomingId()); 538 cachedFlow.setNextOutgoingId(ssn.getNextOutgoingId()); 539 ssn.updateIncomingWindow(); 540 cachedFlow.setIncomingWindow(ssn.getIncomingWindowSize()); 541 cachedFlow.setOutgoingWindow(ssn.getOutgoingWindowSize()); 542 cachedFlow.setProperties(null); 543 if (link !is null) { 544 cachedFlow.setHandle(link.getLocalHandle()); 545 cachedFlow.setDeliveryCount(link.getDeliveryCount()); 546 cachedFlow.setLinkCredit(link.getLinkCredit()); 547 cachedFlow.setDrain(new Boolean((cast(LinkImpl)link.getLink()).getDrain())); 548 } else { 549 cachedFlow.setHandle(null); 550 cachedFlow.setDeliveryCount(null); 551 cachedFlow.setLinkCredit(null); 552 cachedFlow.setDrain(Boolean.FALSE); 553 } 554 writeFrame(ssn.getLocalChannel(), cachedFlow, null, null); 555 } 556 557 private void processSenderFlow() 558 { 559 if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent) 560 { 561 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 562 while(endpoint !is null) 563 { 564 SenderImpl sender = cast(SenderImpl) endpoint; 565 if(sender !is null) 566 { 567 if(sender.getDrain() && sender.getDrained() > 0) 568 { 569 TransportSender transportLink = sender.getTransportLink(); 570 TransportSession transportSession = sender.getSession().getTransportSession(); 571 UnsignedInteger credits = transportLink.getLinkCredit(); 572 transportLink.setLinkCredit(UnsignedInteger.ZERO); 573 transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(credits)); 574 sender.setDrained(0); 575 576 writeFlow(transportSession, transportLink); 577 } 578 } 579 580 endpoint = endpoint.transportNext(); 581 } 582 } 583 } 584 585 private void processTransportWork() 586 { 587 if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent) 588 { 589 DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead(); 590 while(delivery !is null) 591 { 592 LinkImpl link = delivery.getLink(); 593 if (cast(SenderImpl)link !is null) { 594 if (processTransportWorkSender(delivery, cast(SenderImpl) link)) { 595 delivery = delivery.clearTransportWork(); 596 } else { 597 delivery = delivery.getTransportWorkNext(); 598 } 599 } else { 600 if (processTransportWorkReceiver(delivery, cast(ReceiverImpl) link)) { 601 delivery = delivery.clearTransportWork(); 602 } else { 603 delivery = delivery.getTransportWorkNext(); 604 } 605 } 606 } 607 } 608 } 609 610 private bool processTransportWorkSender(DeliveryImpl delivery, 611 SenderImpl snd) 612 { 613 TransportSender tpLink = snd.getTransportLink(); 614 SessionImpl session = snd.getSession(); 615 TransportSession tpSession = session.getTransportSession(); 616 617 bool wasDone = delivery.isDone(); 618 619 if(!delivery.isDone() && 620 (delivery.getDataLength() > 0 || delivery != snd.current()) && 621 tpSession.hasOutgoingCredit() && tpLink.hasCredit() && 622 tpSession.isLocalChannelSet() && 623 tpLink.getLocalHandle() !is null && !_frameWriter.isFull()) 624 { 625 DeliveryImpl inProgress = tpLink.getInProgressDelivery(); 626 if(inProgress !is null){ 627 // There is an existing Delivery awaiting completion. Check it 628 // is the same Delivery object given and return if not, as we 629 // can't interleave Transfer frames for deliveries on a link. 630 if(inProgress != delivery) { 631 return false; 632 } 633 } 634 635 TransportDelivery tpDelivery = delivery.getTransportDelivery(); 636 UnsignedInteger deliveryId; 637 if (tpDelivery !is null) { 638 deliveryId = tpDelivery.getDeliveryId(); 639 } else { 640 deliveryId = tpSession.getOutgoingDeliveryId(); 641 tpSession.incrementOutgoingDeliveryId(); 642 } 643 tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink); 644 delivery.setTransportDelivery(tpDelivery); 645 646 cachedTransfer.setDeliveryId(deliveryId); 647 cachedTransfer.setDeliveryTag(new Binary(delivery.getTag())); 648 cachedTransfer.setHandle(tpLink.getLocalHandle()); 649 cachedTransfer.setRcvSettleMode(null); 650 cachedTransfer.setResume(Boolean.FALSE); // Ensure default is written 651 cachedTransfer.setAborted(Boolean.FALSE); // Ensure default is written 652 cachedTransfer.setBatchable(Boolean.FALSE); // Ensure default is written 653 654 if(delivery.getLocalState() !is null) 655 { 656 cachedTransfer.setState(delivery.getLocalState()); 657 } 658 else 659 { 660 cachedTransfer.setState(null); 661 } 662 663 if(delivery.isSettled()) 664 { 665 cachedTransfer.setSettled(Boolean.TRUE); 666 } 667 else 668 { 669 cachedTransfer.setSettled(Boolean.FALSE); 670 tpSession.addUnsettledOutgoing(deliveryId, delivery); 671 } 672 673 if(snd.current() == delivery) 674 { 675 cachedTransfer.setMore(Boolean.TRUE); 676 } 677 else 678 { 679 // Partial transfers will reset this as needed to true in the FrameWriter 680 cachedTransfer.setMore(Boolean.FALSE); 681 } 682 683 int messageFormat = delivery.getMessageFormat(); 684 if(messageFormat == DeliveryImpl.DEFAULT_MESSAGE_FORMAT) { 685 cachedTransfer.setMessageFormat(UnsignedInteger.ZERO); 686 } else { 687 cachedTransfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat)); 688 } 689 690 ReadableBuffer payload = delivery.getData(); 691 692 int pending = payload.remaining(); 693 //logInfo("pending !!! %d",pending); 694 695 try { 696 writeFrame(tpSession.getLocalChannel(), cachedTransfer, payload, new class Runnable { 697 void run() 698 { 699 cachedTransfer.setMore(Boolean.TRUE); 700 } 701 }); 702 } finally { 703 delivery.afterSend(); // Allow for freeing resources after write of buffered data 704 } 705 706 tpSession.incrementOutgoingId(); 707 tpSession.decrementRemoteIncomingWindow(); 708 709 if (payload is null || !payload.hasRemaining()) 710 { 711 session.incrementOutgoingBytes(-pending); 712 713 if (!cachedTransfer.getMore().booleanValue) { 714 // Clear the in-progress delivery marker 715 tpLink.setInProgressDelivery(null); 716 717 delivery.setDone(); 718 tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE)); 719 tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE)); 720 session.incrementOutgoingDeliveries(-1); 721 snd.decrementQueued(); 722 } 723 } 724 else 725 { 726 session.incrementOutgoingBytes(-(pending - payload.remaining())); 727 728 // Remember the delivery we are still processing 729 // the body transfer frames for 730 tpLink.setInProgressDelivery(delivery); 731 } 732 733 if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) { 734 getConnectionImpl().put(Type.LINK_FLOW, snd); 735 } 736 } 737 738 if(wasDone && delivery.getLocalState() !is null) 739 { 740 TransportDelivery tpDelivery = delivery.getTransportDelivery(); 741 // Use cached object as holder of data for immediate write to the FrameWriter 742 cachedDisposition.setFirst(tpDelivery.getDeliveryId()); 743 cachedDisposition.setLast(tpDelivery.getDeliveryId()); 744 cachedDisposition.setRole(Role.SENDER); 745 cachedDisposition.setSettled(new Boolean(delivery.isSettled())); 746 cachedDisposition.setBatchable(Boolean.FALSE); // Enforce default is written 747 if(delivery.isSettled()) 748 { 749 tpDelivery.settled(); 750 } 751 cachedDisposition.setState(delivery.getLocalState()); 752 753 writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null); 754 } 755 756 return !delivery.isBuffered(); 757 } 758 759 private bool processTransportWorkReceiver(DeliveryImpl delivery, ReceiverImpl rcv) 760 { 761 TransportDelivery tpDelivery = delivery.getTransportDelivery(); 762 SessionImpl session = rcv.getSession(); 763 TransportSession tpSession = session.getTransportSession(); 764 765 if (tpSession.isLocalChannelSet()) 766 { 767 bool settled = delivery.isSettled(); 768 DeliveryState localState = delivery.getLocalState(); 769 // Use cached object as holder of data for immediate write to the FrameWriter 770 cachedDisposition.setFirst(tpDelivery.getDeliveryId()); 771 cachedDisposition.setLast(tpDelivery.getDeliveryId()); 772 cachedDisposition.setRole(Role.RECEIVER); 773 cachedDisposition.setSettled(new Boolean(settled)); 774 cachedDisposition.setState(localState); 775 cachedDisposition.setBatchable(new Boolean(false)); // Enforce default is written 776 777 if(localState is null && settled) { 778 cachedDisposition.setState(delivery.getDefaultDeliveryState()); 779 } 780 781 writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null); 782 if (settled) 783 { 784 tpDelivery.settled(); 785 } 786 return true; 787 } 788 789 return false; 790 } 791 792 private void processReceiverFlow() 793 { 794 // logInfo("processReceiverFlow out ------------------------"); 795 if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent) 796 { 797 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 798 while(endpoint !is null) 799 { 800 ReceiverImpl receiver = cast(ReceiverImpl) endpoint; 801 if(receiver !is null) 802 { 803 TransportLink transportLink = getTransportState(receiver); 804 TransportSession transportSession = getTransportState(receiver.getSession()); 805 806 if(receiver.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet() && !receiver.detached()) 807 { 808 int credits = receiver.clearUnsentCredits(); 809 //logInfo("processReceiverFlow in ------------------------"); 810 if(credits != 0 || receiver.getDrain() || 811 transportSession.getIncomingWindowSize() == (UnsignedInteger.ZERO)) 812 { 813 //logInfo("processReceiverFlow in in ------------------------"); 814 transportLink.addCredit(credits); 815 writeFlow(transportSession, transportLink); 816 } 817 } 818 } 819 endpoint = endpoint.transportNext(); 820 } 821 endpoint = _connectionEndpoint.getTransportHead(); 822 while(endpoint !is null) 823 { 824 SessionImpl session = cast(SessionImpl) endpoint; 825 if(session !is null) 826 { 827 TransportSession transportSession = getTransportState(session); 828 // logInfo("processReceiverFlow in ------------------------"); 829 if(session.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet()) 830 { 831 if(transportSession.getIncomingWindowSize()==(UnsignedInteger.ZERO)) 832 { 833 // logInfo("processReceiverFlow in in------------------------"); 834 writeFlow(transportSession, null); 835 } 836 } 837 } 838 endpoint = endpoint.transportNext(); 839 } 840 } 841 } 842 843 private void processAttach() 844 { 845 if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent) 846 { 847 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 848 849 while(endpoint !is null) 850 { 851 LinkImpl link = cast(LinkImpl) endpoint; 852 if(link !is null) 853 { 854 TransportLink transportLink = getTransportState(link); 855 SessionImpl session = link.getSession(); 856 TransportSession transportSession = getTransportState(session); 857 if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent() && transportSession.isLocalChannelSet()) 858 { 859 860 if( (link.getRemoteState() == EndpointState.ACTIVE 861 && !transportLink.isLocalHandleSet()) || link.getRemoteState() == EndpointState.UNINITIALIZED) 862 { 863 864 UnsignedInteger localHandle = transportSession.allocateLocalHandle(transportLink); 865 866 if(link.getRemoteState() == EndpointState.UNINITIALIZED) 867 { 868 // logInfo("processAttach UNINITIALIZED ..........................."); 869 transportSession.addHalfOpenLink(transportLink); 870 } 871 872 Attach attach = new Attach(); 873 attach.setHandle(localHandle); 874 attach.setName(transportLink.getName() is null ? null : new String(transportLink.getName())); 875 876 if(link.getSenderSettleMode() !is null) 877 { 878 //logInfo("processAttach getSenderSettleMode ......................... %d ",link.getSenderSettleMode().getEnum); 879 attach.setSndSettleMode(link.getSenderSettleMode()); 880 } 881 882 if(link.getReceiverSettleMode() !is null) 883 { 884 //logInfo("processAttach getReceiverSettleMode ......................... %d ",link.getReceiverSettleMode().getEnum); 885 attach.setRcvSettleMode(link.getReceiverSettleMode()); 886 } 887 888 if(link.getSource() !is null) 889 { 890 attach.setSource(link.getSource()); 891 } 892 893 if(link.getTarget() !is null) 894 { 895 attach.setTarget(link.getTarget()); 896 } 897 898 if(link.getProperties() !is null) 899 { 900 //attach.setProperties(link.getProperties()); 901 } 902 903 if(link.getOfferedCapabilities() !is null) 904 { 905 // attach.setOfferedCapabilities(new ArrayList!Symbol(link.getOfferedCapabilities())); 906 } 907 908 if(link.getDesiredCapabilities() !is null) 909 { 910 // attach.setDesiredCapabilities(new ArrayList!Symbol (link.getDesiredCapabilities())); 911 } 912 913 if(link.getMaxMessageSize() !is null) 914 { 915 // logInfo("processAttach getMaxMessageSize ......................... %d ",link.getMaxMessageSize().intValue); 916 attach.setMaxMessageSize(link.getMaxMessageSize()); 917 } 918 919 attach.setRole(cast(ReceiverImpl)endpoint !is null ? Role.RECEIVER : Role.SENDER); 920 921 if(cast(SenderImpl)link !is null) 922 { 923 //logInfo("processAttach setInitialDeliveryCount ......................... "); 924 attach.setInitialDeliveryCount(UnsignedInteger.ZERO); 925 } 926 927 writeFrame(transportSession.getLocalChannel(), attach, null, null); 928 transportLink.sentAttach(); 929 } 930 } 931 } 932 endpoint = endpoint.transportNext(); 933 } 934 } 935 } 936 937 private void processHeader() 938 { 939 // logInfo("processHeader out ------------------------"); 940 if(!_headerWritten) 941 { 942 // logInfo("processHeader in ------------------------"); 943 outputHeaderDescription(); 944 _frameWriter.writeHeader(AmqpHeader.HEADER); 945 _headerWritten = true; 946 } 947 } 948 949 private void outputHeaderDescription() 950 { 951 if (isFrameTracingEnabled()) 952 { 953 log(TransportImpl.OUTGOING, new String(HEADER_DESCRIPTION)); 954 955 ProtocolTracer tracer = getProtocolTracer(); 956 if (tracer !is null) 957 { 958 tracer.sentHeader(HEADER_DESCRIPTION); 959 } 960 } 961 } 962 963 private void processOpen() 964 { 965 // logInfo("processOpen out -------------------------------"); 966 if (!_isOpenSent && (_conditionSet || 967 (_connectionEndpoint !is null && 968 _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED))) 969 { 970 // logInfo("processOpen in -------------------------------"); 971 Open open = new Open(); 972 if (_connectionEndpoint !is null) { 973 // logInfo("processOpen in in-------------------------------"); 974 string cid = _connectionEndpoint.getLocalContainerId(); 975 open.setContainerId( new String( cid is null ? "" : cid)); 976 open.setHostname(_connectionEndpoint.getHostname()is null ? null : new String(_connectionEndpoint.getHostname())); 977 //open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities().length == 0? null : new ArrayList!Symbol(_connectionEndpoint.getDesiredCapabilities())); 978 //open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities().length == 0? null :new ArrayList!Symbol(_connectionEndpoint.getOfferedCapabilities())); 979 //open.setProperties(_connectionEndpoint.getProperties()); 980 open.setDesiredCapabilities(null); 981 open.setOfferedCapabilities(null); 982 open.setProperties(null); 983 } else { 984 open.setContainerId(new String("")); 985 } 986 987 if (_maxFrameSize > 0) { 988 open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize)); 989 } 990 if (_channelMax > 0) { 991 open.setChannelMax(UnsignedShort.valueOf(cast(short) _channelMax)); 992 } 993 994 // as per the recommendation in the spec, advertise half our 995 // actual timeout to the remote 996 if (_localIdleTimeout > 0) { 997 open.setIdleTimeOut(new UnsignedInteger(_localIdleTimeout / 2)); 998 } 999 _isOpenSent = true; 1000 1001 writeFrame(0, open, null, null); 1002 } 1003 } 1004 1005 private void processBegin() 1006 { 1007 // logInfo("processBegin out -----------------------------"); 1008 if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent) 1009 { 1010 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 1011 while(endpoint !is null) 1012 { 1013 SessionImpl session = cast(SessionImpl) endpoint; 1014 if(session !is null) 1015 { 1016 TransportSession transportSession = getTransportState(session); 1017 if(session.getLocalState() != EndpointState.UNINITIALIZED && !transportSession.beginSent()) 1018 { 1019 int channelId = allocateLocalChannel(transportSession); 1020 Begin begin = new Begin(); 1021 1022 if(session.getRemoteState() != EndpointState.UNINITIALIZED) 1023 { 1024 // logInfo("processBegin in in-----------------------------"); 1025 begin.setRemoteChannel(UnsignedShort.valueOf(cast(short) transportSession.getRemoteChannel())); 1026 } 1027 // logInfo("processBegin in -----------------------------"); 1028 transportSession.updateIncomingWindow(); 1029 1030 begin.setHandleMax(transportSession.getHandleMax()); 1031 begin.setIncomingWindow(transportSession.getIncomingWindowSize()); 1032 begin.setOutgoingWindow(transportSession.getOutgoingWindowSize()); 1033 //logInfo("getOutgoingWindowSize :%d",transportSession.getOutgoingWindowSize().intValue); 1034 begin.setNextOutgoingId(transportSession.getNextOutgoingId()); 1035 1036 if(session.getProperties() !is null) 1037 { 1038 // begin.setProperties(session.getProperties()); 1039 } 1040 1041 if(session.getOfferedCapabilities() !is null) 1042 { 1043 //begin.setOfferedCapabilities(new ArrayList!Symbol(session.getOfferedCapabilities())); 1044 } 1045 1046 if(session.getDesiredCapabilities() !is null) 1047 { 1048 // begin.setDesiredCapabilities(new ArrayList!Symbol (session.getDesiredCapabilities())); 1049 } 1050 1051 writeFrame(channelId, begin, null, null); 1052 transportSession.sentBegin(); 1053 } 1054 } 1055 endpoint = endpoint.transportNext(); 1056 } 1057 } 1058 } 1059 1060 private TransportSession getTransportState(SessionImpl session) 1061 { 1062 TransportSession transportSession = session.getTransportSession(); 1063 if(transportSession is null) 1064 { 1065 transportSession = new TransportSession(this, session); 1066 session.setTransportSession(transportSession); 1067 } 1068 return transportSession; 1069 } 1070 1071 private TransportLink getTransportState(LinkImpl link) 1072 { 1073 TransportLink transportLink = link.getTransportLink(); 1074 if(transportLink is null) 1075 { 1076 transportLink = TransportLink.createTransportLink(link); 1077 } 1078 return transportLink; 1079 } 1080 1081 private int allocateLocalChannel(TransportSession transportSession) 1082 { 1083 for (int i = 0; i < _connectionEndpoint.getMaxChannels(); i++) 1084 { 1085 if (!_localSessions.containsKey(i)) 1086 { 1087 _localSessions.put(i, transportSession); 1088 transportSession.setLocalChannel(i); 1089 return i; 1090 } 1091 } 1092 1093 return -1; 1094 } 1095 1096 private int freeLocalChannel(TransportSession transportSession) 1097 { 1098 int channel = transportSession.getLocalChannel(); 1099 _localSessions.remove(channel); 1100 transportSession.freeLocalChannel(); 1101 return channel; 1102 } 1103 1104 private void processEnd() 1105 { 1106 // logInfo("processEnd out -----------------------------"); 1107 if(_connectionEndpoint !is null && _isOpenSent) 1108 { 1109 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 1110 while(endpoint !is null) 1111 { 1112 SessionImpl session; 1113 TransportSession transportSession; 1114 1115 if((cast(SessionImpl)endpoint !is null)) { 1116 if ((session = cast(SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED 1117 && (transportSession = session.getTransportSession()).isLocalChannelSet() 1118 && !_isCloseSent) 1119 { 1120 if (hasSendableMessages(session)) { 1121 endpoint = endpoint.transportNext(); 1122 continue; 1123 } 1124 1125 int channel = freeLocalChannel(transportSession); 1126 End end = new End(); 1127 ErrorCondition localError = endpoint.getCondition(); 1128 if( localError.getCondition() !is null ) 1129 { 1130 end.setError(localError); 1131 } 1132 1133 writeFrame(channel, end, null, null); 1134 } 1135 // logInfo("processEnd in -----------------------------"); 1136 endpoint.clearModified(); 1137 } 1138 1139 endpoint = endpoint.transportNext(); 1140 } 1141 } 1142 } 1143 1144 private bool hasSendableMessages(SessionImpl session) 1145 { 1146 if (_connectionEndpoint is null) { 1147 return false; 1148 } 1149 1150 if(!_closeReceived && (session is null || !session.getTransportSession().endReceived())) 1151 { 1152 EndpointImpl endpoint = _connectionEndpoint.getTransportHead(); 1153 while(endpoint !is null) 1154 { 1155 SenderImpl sender = cast(SenderImpl) endpoint; 1156 if(sender !is null) 1157 { 1158 if((session is null || sender.getSession() == session) 1159 && sender.getQueued() != 0 1160 && !getTransportState(sender).detachReceived()) 1161 { 1162 return true; 1163 } 1164 } 1165 endpoint = endpoint.transportNext(); 1166 } 1167 } 1168 return false; 1169 } 1170 1171 private void processClose() 1172 { 1173 version(HUNT_DEDBUG) info("Closing..."); 1174 if ((_conditionSet || 1175 (_connectionEndpoint !is null && 1176 _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) && 1177 !_isCloseSent) { 1178 if(!hasSendableMessages(null)) 1179 { 1180 Close close = new Close(); 1181 ErrorCondition localError; 1182 1183 if (_connectionEndpoint is null) { 1184 localError = getCondition(); 1185 } else { 1186 localError = _connectionEndpoint.getCondition(); 1187 } 1188 1189 if(localError !is null && localError.getCondition() !is null) 1190 { 1191 close.setError(localError); 1192 } 1193 1194 _isCloseSent = true; 1195 1196 writeFrame(0, close, null, null); 1197 1198 if (_connectionEndpoint !is null) { 1199 _connectionEndpoint.clearModified(); 1200 } 1201 } 1202 } 1203 } 1204 1205 protected void writeFrame(int channel, FrameBody frameBody, 1206 ReadableBuffer payload, Runnable onPayloadTooLarge) 1207 { 1208 _frameWriter.writeFrame(channel, cast(Object)frameBody, payload, onPayloadTooLarge); 1209 } 1210 1211 //================================================================================================================== 1212 1213 override 1214 public ConnectionImpl getConnectionImpl() 1215 { 1216 return _connectionEndpoint; 1217 } 1218 1219 override 1220 void postFinal() {} 1221 1222 override 1223 void doFree() { } 1224 1225 //================================================================================================================== 1226 // handle incoming amqp data 1227 1228 1229 public void handleOpen(Open open, Binary payload, int channel) 1230 { 1231 setRemoteState(EndpointState.ACTIVE); 1232 if(_connectionEndpoint !is null) 1233 { 1234 _connectionEndpoint.handleOpen(open); 1235 } 1236 else 1237 { 1238 _open = open; 1239 } 1240 1241 int effectiveMaxFrameSize = _remoteMaxFrameSize; 1242 if(open.getMaxFrameSize().longValue() > 0) 1243 { 1244 _remoteMaxFrameSize = cast(int) open.getMaxFrameSize().longValue(); 1245 effectiveMaxFrameSize = cast(int) min(open.getMaxFrameSize().longValue(), Integer.MAX_VALUE); 1246 } 1247 1248 if(_outboundFrameSizeLimit > 0) { 1249 effectiveMaxFrameSize = cast(int) min(open.getMaxFrameSize().longValue(), _outboundFrameSizeLimit); 1250 } 1251 1252 _frameWriter.setMaxFrameSize(effectiveMaxFrameSize); 1253 1254 if (open.getChannelMax().longValue() > 0) 1255 { 1256 _remoteChannelMax = cast(int) open.getChannelMax().longValue(); 1257 } 1258 1259 if (open.getIdleTimeOut() !is null && open.getIdleTimeOut().longValue() > 0) 1260 { 1261 _remoteIdleTimeout = open.getIdleTimeOut().intValue(); 1262 } 1263 } 1264 1265 public void handleBegin(Begin begin, Binary payload, int channel) 1266 { 1267 // TODO - check channel < max_channel 1268 TransportSession transportSession = _remoteSessions.get(channel); 1269 if(transportSession !is null) 1270 { 1271 // TODO - fail due to begin on begun session 1272 } 1273 else 1274 { 1275 SessionImpl session; 1276 if(begin.getRemoteChannel() is null) 1277 { 1278 session = _connectionEndpoint.session(); 1279 transportSession = getTransportState(session); 1280 } 1281 else 1282 { 1283 transportSession = _localSessions.get(begin.getRemoteChannel().intValue()); 1284 if (transportSession is null) { 1285 // TODO handle failure rather than just throwing a nicer NPE 1286 throw new NullPointerException("uncorrelated channel: "); 1287 } 1288 session = transportSession.getSession(); 1289 1290 } 1291 transportSession.setRemoteChannel(channel); 1292 session.setRemoteState(EndpointState.ACTIVE); 1293 transportSession.setNextIncomingId(begin.getNextOutgoingId()); 1294 session.setRemoteProperties(begin.getProperties()); 1295 if (begin.getDesiredCapabilities() !is null) 1296 { 1297 session.setRemoteDesiredCapabilities(begin.getDesiredCapabilities().toArray()); 1298 } 1299 if (begin.getOfferedCapabilities() !is null) 1300 { 1301 session.setRemoteOfferedCapabilities(begin.getOfferedCapabilities().toArray()); 1302 } 1303 _remoteSessions.put(channel, transportSession); 1304 1305 _connectionEndpoint.put(Type.SESSION_REMOTE_OPEN, session); 1306 } 1307 1308 } 1309 1310 public void handleAttach(Attach attach, Binary payload, int channel) 1311 { 1312 TransportSession transportSession = _remoteSessions.get(channel); 1313 if(transportSession is null) 1314 { 1315 // TODO - fail due to attach on non-begun session 1316 } 1317 else 1318 { 1319 SessionImpl session = transportSession.getSession(); 1320 UnsignedInteger handle = attach.getHandle(); 1321 if (handle > (transportSession.getHandleMax())) { 1322 // The handle-max value is the highest handle value that can be used on the session. A peer MUST 1323 // NOT attempt to attach a link using a handle value outside the range that its partner can handle. 1324 // A peer that receives a handle outside the supported range MUST close the connection with the 1325 // framing-error error-code. 1326 ErrorCondition condition = 1327 new ErrorCondition(ConnectionError.FRAMING_ERROR, 1328 new String("handle-max exceeded")); 1329 _connectionEndpoint.setCondition(condition); 1330 _connectionEndpoint.setLocalState(EndpointState.CLOSED); 1331 if (!_isCloseSent) { 1332 Close close = new Close(); 1333 close.setError(condition); 1334 _isCloseSent = true; 1335 writeFrame(0, close, null, null); 1336 } 1337 close_tail(); 1338 return; 1339 } 1340 TransportLink transportLink = transportSession.getLinkFromRemoteHandle(handle); 1341 LinkImpl link = null; 1342 1343 if(transportLink !is null) 1344 { 1345 // TODO - fail - attempt attach on a handle which is in use 1346 } 1347 else 1348 { 1349 transportLink = transportSession.resolveHalfOpenLink(cast(string)(attach.getName().getBytes())); 1350 if(transportLink is null) 1351 { 1352 1353 link = (attach.getRole() == Role.RECEIVER) 1354 ? session.sender(cast(string)(attach.getName().getBytes())) 1355 : session.receiver(cast(string)(attach.getName().getBytes())); 1356 transportLink = getTransportState(link); 1357 } 1358 else 1359 { 1360 link = cast(LinkImpl)(transportLink.getLink()); 1361 } 1362 if(attach.getRole() == Role.SENDER) 1363 { 1364 transportLink.setDeliveryCount(attach.getInitialDeliveryCount()); 1365 } 1366 1367 link.setRemoteState(EndpointState.ACTIVE); 1368 link.setRemoteSource(attach.getSource()); 1369 link.setRemoteTarget(attach.getTarget()); 1370 1371 link.setRemoteReceiverSettleMode(attach.getRcvSettleMode()); 1372 link.setRemoteSenderSettleMode(attach.getSndSettleMode()); 1373 1374 link.setRemoteProperties(attach.getProperties()); 1375 1376 if (attach.getDesiredCapabilities() !is null) 1377 { 1378 link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities().toArray()); 1379 } 1380 if (attach.getOfferedCapabilities() !is null) 1381 { 1382 link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities().toArray()); 1383 } 1384 1385 1386 link.setRemoteMaxMessageSize(attach.getMaxMessageSize()); 1387 1388 transportLink.setName(cast(string)(attach.getName().getBytes())); 1389 transportLink.setRemoteHandle(handle); 1390 transportSession.addLinkRemoteHandle(transportLink, handle); 1391 1392 } 1393 1394 _connectionEndpoint.put(Type.LINK_REMOTE_OPEN, link); 1395 } 1396 } 1397 1398 public void handleFlow(Flow flow, Binary payload, int channel) 1399 { 1400 TransportSession transportSession = _remoteSessions.get(channel); 1401 if(transportSession is null) 1402 { 1403 // TODO - fail due to attach on non-begun session 1404 } 1405 else 1406 { 1407 transportSession.handleFlow(flow); 1408 } 1409 1410 } 1411 1412 public void handleTransfer(Transfer transfer, Binary payload, int channel) 1413 { 1414 // TODO - check channel < max_channel 1415 TransportSession transportSession = _remoteSessions.get(channel); 1416 if(transportSession !is null) 1417 { 1418 transportSession.handleTransfer(transfer, payload); 1419 } 1420 else 1421 { 1422 // TODO - fail due to begin on begun session 1423 } 1424 } 1425 1426 public void handleDisposition(Disposition disposition, Binary payload, int channel) 1427 { 1428 TransportSession transportSession = _remoteSessions.get(channel); 1429 if(transportSession is null) 1430 { 1431 // TODO - fail due to attach on non-begun session 1432 } 1433 else 1434 { 1435 transportSession.handleDisposition(disposition); 1436 } 1437 } 1438 1439 public void handleDetach(Detach detach, Binary payload, int channel) 1440 { 1441 TransportSession transportSession = _remoteSessions.get(channel); 1442 if(transportSession is null) 1443 { 1444 // TODO - fail due to attach on non-begun session 1445 } 1446 else 1447 { 1448 TransportLink transportLink = transportSession.getLinkFromRemoteHandle(detach.getHandle()); 1449 1450 if(transportLink !is null) 1451 { 1452 LinkImpl link = cast(LinkImpl)(transportLink.getLink()); 1453 transportLink.receivedDetach(); 1454 transportSession.freeRemoteHandle(transportLink.getRemoteHandle()); 1455 1456 Boolean b = detach.getClosed(); 1457 if (b !is null && b.booleanValue()) { 1458 _connectionEndpoint.put(Type.LINK_REMOTE_CLOSE, link); 1459 } else { 1460 _connectionEndpoint.put(Type.LINK_REMOTE_DETACH, link); 1461 } 1462 transportLink.clearRemoteHandle(); 1463 link.setRemoteState(EndpointState.CLOSED); 1464 if(detach.getError() !is null) 1465 { 1466 link.getRemoteCondition().copyFrom(detach.getError()); 1467 } 1468 } 1469 else 1470 { 1471 // TODO - fail - attempt attach on a handle which is in use 1472 } 1473 } 1474 } 1475 1476 void invoke(FrameBodyHandler!int handler, Binary payload, int context) 1477 { 1478 implementationMissing(false); 1479 } 1480 1481 public void handleEnd(End end, Binary payload, int channel) 1482 { 1483 TransportSession transportSession = _remoteSessions.get(channel); 1484 if(transportSession is null) 1485 { 1486 // TODO - fail due to attach on non-begun session 1487 } 1488 else 1489 { 1490 _remoteSessions.remove(channel); 1491 transportSession.receivedEnd(); 1492 transportSession.unsetRemoteChannel(); 1493 SessionImpl session = transportSession.getSession(); 1494 session.setRemoteState(EndpointState.CLOSED); 1495 ErrorCondition errorCondition = end.getError(); 1496 if(errorCondition !is null) 1497 { 1498 session.getRemoteCondition().copyFrom(errorCondition); 1499 } 1500 1501 _connectionEndpoint.put(Type.SESSION_REMOTE_CLOSE, session); 1502 } 1503 } 1504 1505 public void handleClose(Close close, Binary payload, int channel) 1506 { 1507 _closeReceived = true; 1508 _remoteIdleTimeout = 0; 1509 setRemoteState(EndpointState.CLOSED); 1510 if(_connectionEndpoint !is null) 1511 { 1512 _connectionEndpoint.setRemoteState(EndpointState.CLOSED); 1513 if(close.getError() !is null) 1514 { 1515 _connectionEndpoint.getRemoteCondition().copyFrom(close.getError()); 1516 } 1517 1518 _connectionEndpoint.put(Type.CONNECTION_REMOTE_CLOSE, _connectionEndpoint); 1519 } 1520 1521 } 1522 1523 override 1524 public bool handleFrame(TransportFrame frame) 1525 { 1526 if (!isHandlingFrames()) 1527 { 1528 throw new IllegalStateException("Transport cannot accept frame: "); 1529 } 1530 1531 log(INCOMING, frame); 1532 1533 ProtocolTracer tracer = _protocolTracer.get(); 1534 if( tracer !is null ) 1535 { 1536 tracer.receivedFrame(frame); 1537 } 1538 1539 //import hunt.proton.amqp.transport.Attach; 1540 //import hunt.proton.amqp.transport.Open; 1541 //import hunt.proton.amqp.transport.Begin; 1542 //import hunt.proton.amqp.transport.Detach; 1543 //import hunt.proton.amqp.transport.Close; 1544 //import hunt.proton.amqp.transport.Flow; 1545 //import hunt.proton.amqp.transport.End; 1546 //import hunt.proton.amqp.transport.Transfer; 1547 //import hunt.proton.amqp.transport.EmptyFrame; 1548 //import hunt.proton.amqp.transport.Disposition; 1549 Attach attach = cast(Attach)frame.getBody(); 1550 if (attach !is null) 1551 { 1552 attach.invoke(this,frame.getPayload(), frame.getChannel()); 1553 return _closeReceived; 1554 } 1555 Open open = cast(Open)frame.getBody(); 1556 if (open !is null) 1557 { 1558 open.invoke(this,frame.getPayload(), frame.getChannel()); 1559 return _closeReceived; 1560 } 1561 Begin begin = cast(Begin)frame.getBody(); 1562 if (begin !is null) 1563 { 1564 begin.invoke(this,frame.getPayload(), frame.getChannel()); 1565 return _closeReceived; 1566 } 1567 Detach detach = cast(Detach)frame.getBody(); 1568 if (detach !is null) 1569 { 1570 detach.invoke(this,frame.getPayload(), frame.getChannel()); 1571 return _closeReceived; 1572 } 1573 1574 Close close = cast(Close)frame.getBody(); 1575 if (close !is null) 1576 { 1577 close.invoke(this,frame.getPayload(), frame.getChannel()); 1578 return _closeReceived; 1579 } 1580 Flow flow = cast(Flow)frame.getBody(); 1581 if (flow !is null) 1582 { 1583 flow.invoke(this,frame.getPayload(), frame.getChannel()); 1584 return _closeReceived; 1585 } 1586 End end = cast(End)frame.getBody(); 1587 if (end !is null) 1588 { 1589 end.invoke(this,frame.getPayload(), frame.getChannel()); 1590 return _closeReceived; 1591 } 1592 Transfer transfer = cast(Transfer)frame.getBody(); 1593 if (transfer !is null) 1594 { 1595 transfer.invoke(this,frame.getPayload(), frame.getChannel()); 1596 return _closeReceived; 1597 } 1598 EmptyFrame emptyframe = cast(EmptyFrame)frame.getBody(); 1599 if (emptyframe !is null) 1600 { 1601 emptyframe.invoke(this,frame.getPayload(), frame.getChannel()); 1602 return _closeReceived; 1603 } 1604 Disposition dispos = cast(Disposition)frame.getBody(); 1605 if (dispos !is null) 1606 { 1607 dispos.invoke(this,frame.getPayload(), frame.getChannel()); 1608 return _closeReceived; 1609 } 1610 // (cast(FrameBodyHandler!int)frame.getBody()).invoke(this,frame.getPayload(), frame.getChannel()); 1611 1612 auto b = cast(Object)frame.getBody(); 1613 warning("unhandled: ", typeid(b)); 1614 return _closeReceived; 1615 } 1616 1617 void put(Type type, Object context) { 1618 if (_connectionEndpoint !is null) { 1619 _connectionEndpoint.put(type, context); 1620 } 1621 } 1622 1623 private void maybePostClosed() 1624 { 1625 if (postedHeadClosed && postedTailClosed) { 1626 put(Type.TRANSPORT_CLOSED, this); 1627 } 1628 } 1629 1630 override 1631 public void closed(TransportException error) 1632 { 1633 if (!_closeReceived || error !is null) { 1634 // Set an error condition, but only if one was not already set 1635 if(!_conditionSet) { 1636 string description = error is null ? "connection aborted" : error.toString(); 1637 setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, new String( description))); 1638 } 1639 1640 _head_closed = true; 1641 } 1642 1643 if (_conditionSet && !postedTransportError) { 1644 put(Type.TRANSPORT_ERROR, this); 1645 postedTransportError = true; 1646 } 1647 1648 if (!postedTailClosed) { 1649 put(Type.TRANSPORT_TAIL_CLOSED, this); 1650 postedTailClosed = true; 1651 maybePostClosed(); 1652 } 1653 } 1654 1655 override 1656 public bool isHandlingFrames() 1657 { 1658 return _connectionEndpoint !is null || getRemoteState() == EndpointState.UNINITIALIZED; 1659 } 1660 1661 override 1662 public ProtocolTracer getProtocolTracer() 1663 { 1664 return _protocolTracer.get(); 1665 } 1666 1667 override 1668 public void setProtocolTracer(ProtocolTracer protocolTracer) 1669 { 1670 this._protocolTracer.set(protocolTracer); 1671 } 1672 1673 override 1674 public ByteBuffer getInputBuffer() 1675 { 1676 return tail(); 1677 } 1678 1679 override 1680 public TransportResult processInput() 1681 { 1682 try { 1683 process(); 1684 return TransportResultFactory.ok(); 1685 } catch (TransportException e) { 1686 return TransportResultFactory.error(e); 1687 } 1688 } 1689 1690 override 1691 public ByteBuffer getOutputBuffer() 1692 { 1693 pending(); 1694 return head(); 1695 } 1696 1697 override 1698 public void outputConsumed() 1699 { 1700 pop(_outputProcessor.head().position()); 1701 } 1702 1703 override 1704 public int capacity() 1705 { 1706 init(); 1707 return _inputProcessor.capacity(); 1708 } 1709 1710 override 1711 public ByteBuffer tail() 1712 { 1713 init(); 1714 return _inputProcessor.tail(); 1715 } 1716 1717 override 1718 public void process() 1719 { 1720 _processingStarted = true; 1721 1722 try { 1723 init(); 1724 int beforePosition = _inputProcessor.position(); 1725 _inputProcessor.process(); 1726 _bytesInput += beforePosition - _inputProcessor.position(); 1727 } catch (TransportException e) { 1728 _head_closed = true; 1729 } 1730 } 1731 1732 override 1733 public void close_tail() 1734 { 1735 init(); 1736 _inputProcessor.close_tail(); 1737 } 1738 1739 override 1740 public int pending() 1741 { 1742 init(); 1743 return _outputProcessor.pending(); 1744 } 1745 1746 override 1747 public ByteBuffer head() 1748 { 1749 init(); 1750 return _outputProcessor.head(); 1751 } 1752 1753 override 1754 public void pop(int bytes) 1755 { 1756 init(); 1757 _outputProcessor.pop(bytes); 1758 _bytesOutput += bytes; 1759 1760 int p = pending(); 1761 if (p < 0 && !postedHeadClosed) { 1762 put(Type.TRANSPORT_HEAD_CLOSED, this); 1763 postedHeadClosed = true; 1764 maybePostClosed(); 1765 } 1766 } 1767 1768 override 1769 public void setIdleTimeout(int timeout) { 1770 _localIdleTimeout = timeout; 1771 } 1772 1773 override 1774 public int getIdleTimeout() { 1775 return _localIdleTimeout; 1776 } 1777 1778 override 1779 public int getRemoteIdleTimeout() { 1780 return _remoteIdleTimeout; 1781 } 1782 1783 override 1784 public long tick(long now) 1785 { 1786 long deadline = 0; 1787 synchronized(this){ 1788 if (pending() == 0) { 1789 writeFrame( 0, null, null, null); 1790 pending(); 1791 } 1792 } 1793 //synchronized(this){ 1794 // if (_localIdleTimeout > 0) { 1795 // if (_localIdleDeadline == 0 || _lastBytesInput != _bytesInput) { 1796 // _localIdleDeadline = computeDeadline(now, _localIdleTimeout); 1797 // _lastBytesInput = _bytesInput; 1798 // } else if (_localIdleDeadline - now <= 0) { 1799 // _localIdleDeadline = computeDeadline(now, _localIdleTimeout); 1800 // if (_connectionEndpoint !is null && 1801 // _connectionEndpoint.getLocalState() != EndpointState.CLOSED) { 1802 // ErrorCondition condition = 1803 // new ErrorCondition(Symbol.getSymbol("amqp:resource-limit-exceeded"), 1804 // new String ("local-idle-timeout expired")); 1805 // _connectionEndpoint.setCondition(condition); 1806 // _connectionEndpoint.setLocalState(EndpointState.CLOSED); 1807 // 1808 // if (!_isOpenSent) { 1809 // if ((_sasl !is null) && (!_sasl.isDone())) { 1810 // _sasl.fail(); 1811 // } 1812 // Open open = new Open(); 1813 // _isOpenSent = true; 1814 // writeFrame(0, open, null, null); 1815 // } 1816 // if (!_isCloseSent) { 1817 // Close close = new Close(); 1818 // close.setError(condition); 1819 // _isCloseSent = true; 1820 // writeFrame(0, close, null, null); 1821 // } 1822 // close_tail(); 1823 // } 1824 // } 1825 // deadline = _localIdleDeadline; 1826 // } 1827 // 1828 // if (_remoteIdleTimeout != 0 && !_isCloseSent) { 1829 // if (_remoteIdleDeadline == 0 || _lastBytesOutput != _bytesOutput) { 1830 // _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2); 1831 // _lastBytesOutput = _bytesOutput; 1832 // } else if (_remoteIdleDeadline - now <= 0) { 1833 // _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2); 1834 // if (pending() == 0) { 1835 // writeFrame(0, null, null, null); 1836 // _lastBytesOutput += pending(); 1837 // } 1838 // } 1839 // 1840 // if(deadline == 0) { 1841 // deadline = _remoteIdleDeadline; 1842 // } else { 1843 // if(_remoteIdleDeadline - _localIdleDeadline <= 0) { 1844 // deadline = _remoteIdleDeadline; 1845 // } else { 1846 // deadline = _localIdleDeadline; 1847 // } 1848 // } 1849 // } 1850 //} 1851 return deadline; 1852 } 1853 1854 private long computeDeadline(long now, long timeout) { 1855 long deadline = now + timeout; 1856 1857 // We use 0 to signal not-initialised and/or no-timeout, so in the 1858 // unlikely event thats to be the actual deadline, return 1 instead 1859 return deadline != 0 ? deadline : 1; 1860 } 1861 1862 override 1863 public long getFramesOutput() 1864 { 1865 return _frameWriter.getFramesOutput(); 1866 } 1867 1868 override 1869 public long getFramesInput() 1870 { 1871 return _frameParser.getFramesInput(); 1872 } 1873 1874 override 1875 public void close_head() 1876 { 1877 _outputProcessor.close_head(); 1878 } 1879 1880 override 1881 public bool isClosed() { 1882 int p = pending(); 1883 int c = capacity(); 1884 return p == END_OF_STREAM && c == END_OF_STREAM; 1885 } 1886 override 1887 public string toString() 1888 { 1889 implementationMissing(false); 1890 return ""; 1891 // return "TransportImpl [_connectionEndpoint=" ~ _connectionEndpoint ~ ", " ~ super.toString() ~ "]"; 1892 } 1893 1894 /** 1895 * Override the default frame handler. Must be called before the transport starts being used 1896 * (e.g. {@link #getInputBuffer()}, {@link #getOutputBuffer()}, {@link #ssl(SslDomain)} etc). 1897 */ 1898 public void setFrameHandler(FrameHandler frameHandler) 1899 { 1900 _frameHandler = frameHandler; 1901 } 1902 1903 static string INCOMING = "<-"; 1904 static string OUTGOING = "->"; 1905 1906 void log(string event, TransportFrame frame) 1907 { 1908 if (isTraceFramesEnabled()) { 1909 outputMessage(event, frame.getChannel(), cast(Object)frame.getBody(), frame.getPayload()); 1910 } 1911 } 1912 1913 void log(string event, SaslFrameBody frameBody) { 1914 if (isTraceFramesEnabled()) { 1915 outputMessage(event, 0, cast(Object)frameBody, null); 1916 } 1917 } 1918 1919 void log(string event, String headerDescription) { 1920 if (isTraceFramesEnabled()) { 1921 outputMessage(event, 0, headerDescription, null); 1922 } 1923 } 1924 1925 private void outputMessage(string event, int channel, Object frameBody, Binary payload) { 1926 implementationMissing(false); 1927 //string msg ; 1928 // 1929 //msg.append("[").append(System.identityHashCode(this)).append(":").append(channel).append("] "); 1930 //msg.append(event).append(" ").append(frameBody); 1931 //if (payload !is null) { 1932 // msg.append(" (").append(payload.getLength()).append(") "); 1933 // msg.append(StringUtils.toQuotedString(payload, TRACE_FRAME_PAYLOAD_LENGTH, true)); 1934 //} 1935 // 1936 //System.out.println(msg.toString()); 1937 } 1938 1939 bool isFrameTracingEnabled() 1940 { 1941 return (_levels & TRACE_FRM) != 0 || _protocolTracer.get() !is null; 1942 } 1943 1944 bool isTraceFramesEnabled() 1945 { 1946 return (_levels & TRACE_FRM) != 0; 1947 } 1948 1949 override 1950 void localOpen() {} 1951 1952 override 1953 void localClose() {} 1954 1955 public void setSelectable(Selectable selectable) { 1956 _selectable = selectable; 1957 } 1958 1959 public Selectable getSelectable() { 1960 return _selectable; 1961 } 1962 1963 public void setReactor(Reactor reactor) { 1964 _reactor = reactor; 1965 } 1966 1967 public Reactor getReactor() { 1968 return _reactor; 1969 } 1970 1971 override 1972 public void setEmitFlowEventOnSend(bool emitFlowEventOnSend) 1973 { 1974 _emitFlowEventOnSend = emitFlowEventOnSend; 1975 } 1976 1977 override 1978 public bool isEmitFlowEventOnSend() 1979 { 1980 return _emitFlowEventOnSend; 1981 } 1982 1983 override 1984 public void setUseReadOnlyOutputBuffer(bool value) 1985 { 1986 this._useReadOnlyOutputBuffer = value; 1987 } 1988 1989 override 1990 public bool isUseReadOnlyOutputBuffer() 1991 { 1992 return _useReadOnlyOutputBuffer; 1993 } 1994 1995 // From TransportInternal 1996 override 1997 public void addTransportLayer(TransportLayer layer) 1998 { 1999 if (_processingStarted) 2000 { 2001 throw new IllegalStateException("Additional layer can't be added after transport has started processing"); 2002 } 2003 2004 if (_additionalTransportLayers is null) 2005 { 2006 _additionalTransportLayers = new ArrayList!TransportLayer(); 2007 } 2008 2009 if (!_additionalTransportLayers.contains(layer)) 2010 { 2011 init(); 2012 TransportWrapper transportWrapper = layer.wrap(_inputProcessor, _outputProcessor); 2013 _inputProcessor = transportWrapper; 2014 _outputProcessor = transportWrapper; 2015 _additionalTransportLayers.add(layer); 2016 } 2017 } 2018 2019 override 2020 public void setOutboundFrameSizeLimit(int limit) { 2021 _outboundFrameSizeLimit = limit; 2022 } 2023 2024 override 2025 public int getOutboundFrameSizeLimit() { 2026 return _outboundFrameSizeLimit; 2027 } 2028 }