1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.TransportImpl;
13 
14 import hunt.logging;
15 import std.conv:to;
16 import hunt.proton.engine.impl.ByteBufferUtils;
17 import hunt.Boolean;
18 import hunt.io.ByteBuffer;
19 import hunt.collection.ArrayList;
20 import hunt.collection.HashMap;
21 import hunt.collection.List;
22 import hunt.collection.Map;
23 import hunt.collection.LinkedHashMap;
24 
25 import hunt.proton.amqp.Binary;
26 import hunt.proton.amqp.Symbol;
27 import hunt.proton.amqp.UnsignedInteger;
28 import hunt.proton.amqp.UnsignedShort;
29 import hunt.proton.amqp.security.SaslFrameBody;
30 import hunt.proton.amqp.transport.Attach;
31 import hunt.proton.amqp.transport.Begin;
32 import hunt.proton.amqp.transport.Close;
33 import hunt.proton.amqp.transport.ConnectionError;
34 import hunt.proton.amqp.transport.DeliveryState;
35 import hunt.proton.amqp.transport.Detach;
36 import hunt.proton.amqp.transport.Disposition;
37 import hunt.proton.amqp.transport.End;
38 import hunt.proton.amqp.transport.ErrorCondition;
39 import hunt.proton.amqp.transport.Flow;
40 import hunt.proton.amqp.transport.FrameBody;
41 import hunt.proton.amqp.transport.Open;
42 import hunt.proton.amqp.transport.Role;
43 import hunt.proton.amqp.transport.Transfer;
44 import hunt.proton.codec.AMQPDefinedTypes;
45 import hunt.proton.codec.DecoderImpl;
46 import hunt.proton.codec.EncoderImpl;
47 import hunt.proton.codec.ReadableBuffer;
48 import hunt.proton.engine.Connection;
49 import hunt.proton.engine.EndpointState;
50 import hunt.proton.engine.Event;
51 import hunt.proton.engine.ProtonJTransport;
52 import hunt.proton.engine.Sasl;
53 import hunt.proton.engine.Ssl;
54 import hunt.proton.engine.SslDomain;
55 import hunt.proton.engine.SslPeerDetails;
56 import hunt.proton.engine.TransportException;
57 import hunt.proton.engine.TransportResult;
58 import hunt.proton.engine.TransportResultFactory;
59 import hunt.proton.engine.impl.ssl.SslImpl;
60 import hunt.proton.framing.TransportFrame;
61 import hunt.proton.engine.Reactor;
62 import hunt.proton.engine.Selectable;
63 import hunt.proton.engine.impl.EndpointImpl;
64 import hunt.proton.engine.impl.FrameHandler;
65 import hunt.proton.engine.impl.TransportOutputWriter;
66 import hunt.proton.engine.impl.TransportInternal;
67 import hunt.Integer;
68 import hunt.proton.engine.impl.FrameParser;
69 import hunt.proton.engine.impl.ConnectionImpl;
70 import hunt.proton.engine.impl.TransportSession;
71 import hunt.proton.engine.impl.TransportInput;
72 import hunt.proton.engine.impl.TransportOutput;
73 import hunt.proton.engine.impl.FrameWriter;
74 import hunt.proton.engine.impl.SaslImpl;
75 import hunt.proton.engine.impl.Ref;
76 import hunt.proton.engine.impl.ProtocolTracer;
77 import hunt.proton.engine.Transport;
78 import hunt.proton.engine.impl.TransportLayer;
79 import hunt.proton.engine.impl.TransportWrapper;
80 import hunt.Exceptions;
81 import hunt.proton.engine.impl.LinkImpl;
82 import hunt.proton.engine.impl.TransportLink;
83 import hunt.proton.engine.impl.SessionImpl;
84 import hunt.proton.engine.impl.SenderImpl;
85 import hunt.proton.engine.impl.TransportSender;
86 import hunt.proton.engine.impl.DeliveryImpl;
87 import hunt.proton.engine.impl.TransportDelivery;
88 import hunt.Boolean;
89 import hunt.util.Common;
90 import hunt.proton.engine.impl.ReceiverImpl;
91 import hunt.proton.engine.impl.AmqpHeader;
92 import hunt.proton.engine.Event;
93 import hunt.proton.engine.impl.TransportOutputAdaptor;
94 import hunt.String;
95 import std.algorithm;
96 import hunt.proton.amqp.transport.Attach;
97 import hunt.proton.amqp.transport.Open;
98 import hunt.proton.amqp.transport.Begin;
99 import hunt.proton.amqp.transport.Detach;
100 import hunt.proton.amqp.transport.Close;
101 import hunt.proton.amqp.transport.Flow;
102 import hunt.proton.amqp.transport.End;
103 import hunt.proton.amqp.transport.Transfer;
104 import hunt.proton.amqp.transport.EmptyFrame;
105 import hunt.proton.amqp.transport.Disposition;
106 
107 import hunt.Exceptions;
108 import hunt.util.Runnable;
109 
110 class TransportImpl : EndpointImpl, ProtonJTransport, FrameBodyHandler!int,
111         FrameHandler, TransportOutputWriter, TransportInternal
112 {
113    // static int BUFFER_RELEASE_THRESHOLD = Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 1024);
114     private static int CHANNEL_MAX_LIMIT = 65535;
115     static int BUFFER_RELEASE_THRESHOLD = 2 * 1024 * 1024;
116     //private static boolean getBooleanEnv(String name)
117     //{
118     //    String value = System.getenv(name);
119     //    return "true".equalsIgnoreCase(value) ||
120     //        "1".equals(value) ||
121     //        "yes".equalsIgnoreCase(value);
122     //}
123 
124     private static bool FRM_ENABLED = false;
125     //private static boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
126     private static int TRACE_FRAME_PAYLOAD_LENGTH = 1024;
127    // private static int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("proton.trace_frame_payload_length", 1024);
128     private static string HEADER_DESCRIPTION = "AMQP";
129 
130     // trace levels
131     private int _levels = 0;
132 
133     private FrameParser _frameParser;
134 
135     private ConnectionImpl _connectionEndpoint;
136 
137     private bool _isOpenSent;
138     private bool _isCloseSent;
139 
140     private bool _headerWritten;
141     private Map!(int, TransportSession) _remoteSessions ;//= new HashMap<Integer, TransportSession>();
142     private Map!(int, TransportSession) _localSessions ; //= new HashMap<Integer, TransportSession>();
143 
144     private TransportInput _inputProcessor;
145     private TransportOutput _outputProcessor;
146 
147     private DecoderImpl _decoder ;//= new DecoderImpl();
148     private EncoderImpl _encoder ;//= new EncoderImpl(_decoder);
149 
150     private int _maxFrameSize  ;//= Transport.DEFAULT_MAX_FRAME_SIZE;
151     private int _remoteMaxFrameSize ;// = Transport.MIN_MAX_FRAME_SIZE;
152     private int _outboundFrameSizeLimit = 0;
153     private int _channelMax      ;// = CHANNEL_MAX_LIMIT;
154     private int _remoteChannelMax ;// = CHANNEL_MAX_LIMIT;
155 
156     private FrameWriter _frameWriter;
157 
158     private bool _closeReceived;
159     private Open _open;
160     private SaslImpl _sasl;
161     //private SslImpl _ssl;
162     private Ref!ProtocolTracer _protocolTracer ;//= new Ref<>(null);
163 
164     private TransportResult _lastTransportResult ;//= TransportResultFactory.ok();
165 
166     private bool _init;
167     private bool _processingStarted;
168     private bool _emitFlowEventOnSend = true;
169     private bool _useReadOnlyOutputBuffer = true;
170 
171     private FrameHandler _frameHandler ;//= this;
172     private bool _head_closed = false;
173     private bool _conditionSet;
174 
175     private bool postedHeadClosed = false;
176     private bool postedTailClosed = false;
177     private bool postedTransportError = false;
178 
179     private int _localIdleTimeout = 0;
180     private int _remoteIdleTimeout = 0;
181     private long _bytesInput = 0;
182     private long _bytesOutput = 0;
183     private long _localIdleDeadline = 0;
184     private long _lastBytesInput = 0;
185     private long _lastBytesOutput = 0;
186     private long _remoteIdleDeadline = 0;
187 
188     private Selectable _selectable;
189     private Reactor _reactor;
190 
191     private List!TransportLayer _additionalTransportLayers;
192 
193     // Cached instances used to carry the Performatives to the frame writer without the need to create
194     // a new instance on each operation that triggers a write
195     private Disposition cachedDisposition ;//= new Disposition();
196     private Flow cachedFlow  ;//= new Flow();
197     private Transfer cachedTransfer ;//= new Transfer();
198 
199     /**
200      * Application code should use {@link hunt.proton.engine.Transport.Factory#create()} instead
201      */
202     this()
203     {
204         this(Transport.DEFAULT_MAX_FRAME_SIZE);
205     }
206 
207     /**
208      * Creates a transport with the given maximum frame size.
209      * Note that the maximumFrameSize also determines the size of the output buffer.
210      */
211     this(int maxFrameSize)
212     {
213         _decoder = new DecoderImpl;
214         _encoder = new EncoderImpl(_decoder);
215         _remoteSessions = new LinkedHashMap!(int, TransportSession);
216         _localSessions = new LinkedHashMap!(int, TransportSession);
217         _protocolTracer = new Ref!ProtocolTracer(null);
218         _lastTransportResult = TransportResultFactory.ok();
219         cachedTransfer = new Transfer();
220         cachedFlow = new Flow();
221         cachedDisposition = new Disposition();
222         _remoteMaxFrameSize = Transport.MIN_MAX_FRAME_SIZE;
223         _maxFrameSize = Transport.DEFAULT_MAX_FRAME_SIZE;
224         _channelMax       = CHANNEL_MAX_LIMIT;
225         _remoteChannelMax  = CHANNEL_MAX_LIMIT;
226         AMQPDefinedTypes.registerAllTypes(_decoder, _encoder);
227 
228         _maxFrameSize = maxFrameSize;
229         _frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize,
230                                        FrameWriter.AMQP_FRAME_TYPE,
231                                        this);
232         _frameHandler = this;
233     }
234 
235     private void init()
236     {
237         if(!_init)
238         {
239             _init = true;
240             _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize, this);
241             _inputProcessor = _frameParser;
242             _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize, isUseReadOnlyOutputBuffer());
243         }
244     }
245 
246     public void trace(int levels) {
247         _levels = levels;
248     }
249 
250     public int getMaxFrameSize()
251     {
252         return _maxFrameSize;
253     }
254 
255     public int getRemoteMaxFrameSize()
256     {
257         return _remoteMaxFrameSize;
258     }
259 
260     public void setInitialRemoteMaxFrameSize(int remoteMaxFrameSize)
261     {
262         if(_init)
263         {
264             throw new IllegalStateException("Cannot set initial remote max frame size after transport has been initialised");
265         }
266 
267         _remoteMaxFrameSize = remoteMaxFrameSize;
268     }
269 
270     public void setMaxFrameSize(int maxFrameSize)
271     {
272         if(_init)
273         {
274             throw new IllegalStateException("Cannot set max frame size after transport has been initialised");
275         }
276         _maxFrameSize = maxFrameSize;
277     }
278 
279     public int getChannelMax()
280     {
281         return _channelMax;
282     }
283 
284     public void setChannelMax(int channelMax)
285     {
286         if(_isOpenSent)
287         {
288           throw new IllegalArgumentException("Cannot change channel max after open frame has been sent");
289         }
290 
291         if(channelMax < 0 || channelMax >= (1<<16))
292         {
293             throw new NumberFormatException("Value \"" ~ to!string(channelMax) ~"\" lies outside the range [0-" ~ to!string((1<<16) )~").");
294         }
295 
296         _channelMax = channelMax;
297     }
298 
299     override
300     public int getRemoteChannelMax()
301     {
302         return _remoteChannelMax;
303     }
304 
305     override
306     public ErrorCondition getCondition()
307     {
308         // Get the ErrorCondition, but only return it if its condition field is populated.
309         // This somewhat retains prior TransportImpl behaviour of returning null when no
310         // condition had been set (by TransportImpl itself) rather than the 'empty' ErrorCondition
311         // object historically used in the other areas.
312         ErrorCondition errorCondition = super.getCondition();
313         return errorCondition.getCondition() !is null ? errorCondition : null;
314     }
315 
316     override
317     public void setCondition(ErrorCondition error)
318     {
319         super.setCondition(error);
320         _conditionSet = error !is null && error.getCondition() !is null;
321     }
322 
323     override
324     public void bind(Connection conn)
325     {
326         // TODO - check if already bound
327 
328         _connectionEndpoint = cast(ConnectionImpl) conn;
329         put(Type.CONNECTION_BOUND, cast(Object)conn);
330         _connectionEndpoint.setTransport(this);
331         _connectionEndpoint.incref();
332 
333         if(getRemoteState() != EndpointState.UNINITIALIZED)
334         {
335             _connectionEndpoint.handleOpen(_open);
336             if(getRemoteState() == EndpointState.CLOSED)
337             {
338                 _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
339             }
340 
341             _frameParser.flush();
342         }
343     }
344 
345 
346     FrameBody copy()
347     {
348         implementationMissing(false);
349         return null;
350     }
351 
352     override
353     public void unbind()
354     {
355         foreach (TransportSession ts ;_localSessions.values()) {
356             ts.unbind();
357         }
358         foreach (TransportSession ts ;_remoteSessions.values()) {
359             ts.unbind();
360         }
361 
362         put(Type.CONNECTION_UNBOUND, _connectionEndpoint);
363 
364         _connectionEndpoint.modifyEndpoints();
365         _connectionEndpoint.setTransport(null);
366         _connectionEndpoint.decref();
367     }
368 
369     override
370     public int input(byte[] bytes, int offset, int length)
371     {
372         oldApiCheckStateBeforeInput(length).checkIsOk();
373 
374         ByteBuffer inputBuffer = getInputBuffer();
375         int numberOfBytesConsumed = ByteBufferUtils.pourArrayToBuffer(bytes, offset, length, inputBuffer);
376         processInput().checkIsOk();
377         return numberOfBytesConsumed;
378     }
379 
380     /**
381      * This method is public as it is used by Python layer.
382      * @see hunt.proton.engine.Transport#input(byte[], int, int)
383      */
384     public TransportResult oldApiCheckStateBeforeInput(int inputLength)
385     {
386         _lastTransportResult.checkIsOk();
387         if(inputLength == 0)
388         {
389             if(_connectionEndpoint is null || _connectionEndpoint.getRemoteState() != EndpointState.CLOSED)
390             {
391                 return TransportResultFactory.error(new TransportException("Unexpected EOS when remote connection not closed: connection aborted"));
392             }
393         }
394         return TransportResultFactory.ok();
395     }
396 
397     //==================================================================================================================
398     // Process model state to generate output
399 
400     override
401     public int output(byte[] bytes, int offset, int size)
402     {
403         ByteBuffer outputBuffer = getOutputBuffer();
404         int numberOfBytesOutput = ByteBufferUtils.pourBufferToArray(outputBuffer, bytes, offset, size);
405         outputConsumed();
406         return numberOfBytesOutput;
407     }
408 
409     override
410     public bool writeInto(ByteBuffer outputBuffer)
411     {
412         processHeader();
413         processOpen();
414         processBegin();
415         processAttach();
416         processReceiverFlow();
417         // we process transport work twice intentionally, the first
418         // pass may end up settling deliveries that the second pass
419         // can clean up
420         processTransportWork();
421         processTransportWork();
422         processSenderFlow();
423         processDetach();
424         processEnd();
425         processClose();
426 
427         _frameWriter.readBytes(outputBuffer);
428 
429         return _isCloseSent || _head_closed;
430     }
431 
432     override
433     public Sasl sasl()
434     {
435         if(_sasl is null)
436         {
437             if(_processingStarted)
438             {
439                 throw new IllegalStateException("Sasl can't be initiated after transport has started processing");
440             }
441 
442             init();
443             _sasl = new SaslImpl(this, _remoteMaxFrameSize);
444             TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, _outputProcessor);
445             _inputProcessor = transportWrapper;
446             _outputProcessor = transportWrapper;
447         }
448         return _sasl;
449 
450     }
451 
452     /**
453      * {@inheritDoc}
454      *
455      * <p>Note that sslDomain must implement {@link hunt.proton.engine.impl.ssl.ProtonSslEngineProvider}.
456      * This is not possible enforce at the API level because {@link hunt.proton.engine.impl.ssl.ProtonSslEngineProvider} is not part of the
457      * public Proton API.</p>
458      */
459     override
460     public Ssl ssl(SslDomain sslDomain, SslPeerDetails sslPeerDetails)
461     {
462         implementationMissing(false);
463         return null;
464         //if (_ssl is null)
465         //{
466         //    init();
467         //    _ssl = new SslImpl(sslDomain, sslPeerDetails);
468         //    TransportWrapper transportWrapper = _ssl.wrap(_inputProcessor, _outputProcessor);
469         //    _inputProcessor = transportWrapper;
470         //    _outputProcessor = transportWrapper;
471         //}
472         //return _ssl;
473     }
474 
475     override
476     public Ssl ssl(SslDomain sslDomain)
477     {
478         return ssl(sslDomain, null);
479     }
480 
481     private void processDetach()
482     {
483        // logInfo("processDetach out -------------------------------------");
484         if(_connectionEndpoint !is null && _isOpenSent)
485         {
486             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
487             while(endpoint !is null)
488             {
489                 LinkImpl link = cast(LinkImpl)endpoint;
490                 if(link !is null)
491                 {
492                     TransportLink  transportLink = getTransportState(link);
493                     SessionImpl session = link.getSession();
494                     TransportSession transportSession = getTransportState(session);
495                    // logInfo("processDetach in -------------------------------------");
496                     if(((link.getLocalState() == EndpointState.CLOSED) || link.detached())
497                        && transportLink.isLocalHandleSet()
498                        && transportSession.isLocalChannelSet()
499                        && !_isCloseSent)
500                     {
501                         if((cast(SenderImpl)link !is null)
502                            && link.getQueued() > 0
503                            && !transportLink.detachReceived()
504                            && !transportSession.endReceived()
505                            && !_closeReceived) {
506                             endpoint = endpoint.transportNext();
507                             continue;
508                         }
509 
510                         UnsignedInteger localHandle = transportLink.getLocalHandle();
511                         transportLink.clearLocalHandle();
512                         transportSession.freeLocalHandle(localHandle);
513 
514                         Detach detach = new Detach();
515                         detach.setHandle(localHandle);
516                         detach.setClosed(new Boolean(!link.detached()));
517 
518                         ErrorCondition localError = link.getCondition();
519                         if( localError.getCondition() !is null )
520                         {
521                             detach.setError(localError);
522                         }
523 
524                         writeFrame(transportSession.getLocalChannel(), detach, null, null);
525                     }
526 
527                     endpoint.clearModified();
528 
529                 }
530                 endpoint = endpoint.transportNext();
531             }
532         }
533     }
534 
535     private void writeFlow(TransportSession ssn, TransportLink link)
536     {
537         cachedFlow.setNextIncomingId(ssn.getNextIncomingId());
538         cachedFlow.setNextOutgoingId(ssn.getNextOutgoingId());
539         ssn.updateIncomingWindow();
540         cachedFlow.setIncomingWindow(ssn.getIncomingWindowSize());
541         cachedFlow.setOutgoingWindow(ssn.getOutgoingWindowSize());
542         cachedFlow.setProperties(null);
543         if (link !is null) {
544             cachedFlow.setHandle(link.getLocalHandle());
545             cachedFlow.setDeliveryCount(link.getDeliveryCount());
546             cachedFlow.setLinkCredit(link.getLinkCredit());
547             cachedFlow.setDrain(new Boolean((cast(LinkImpl)link.getLink()).getDrain()));
548         } else {
549             cachedFlow.setHandle(null);
550             cachedFlow.setDeliveryCount(null);
551             cachedFlow.setLinkCredit(null);
552             cachedFlow.setDrain(Boolean.FALSE);
553         }
554         writeFrame(ssn.getLocalChannel(), cachedFlow, null, null);
555     }
556 
557     private void processSenderFlow()
558     {
559         if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent)
560         {
561             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
562             while(endpoint !is null)
563             {
564                  SenderImpl sender = cast(SenderImpl) endpoint;
565                 if(sender !is null)
566                 {
567                     if(sender.getDrain() && sender.getDrained() > 0)
568                     {
569                         TransportSender transportLink = sender.getTransportLink();
570                         TransportSession transportSession = sender.getSession().getTransportSession();
571                         UnsignedInteger credits = transportLink.getLinkCredit();
572                         transportLink.setLinkCredit(UnsignedInteger.ZERO);
573                         transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(credits));
574                         sender.setDrained(0);
575 
576                         writeFlow(transportSession, transportLink);
577                     }
578                 }
579 
580                 endpoint = endpoint.transportNext();
581             }
582         }
583     }
584 
585     private void processTransportWork()
586     {
587         if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent)
588         {
589             DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
590             while(delivery !is null)
591             {
592                 LinkImpl link = delivery.getLink();
593                 if (cast(SenderImpl)link !is null) {
594                     if (processTransportWorkSender(delivery, cast(SenderImpl) link)) {
595                         delivery = delivery.clearTransportWork();
596                     } else {
597                         delivery = delivery.getTransportWorkNext();
598                     }
599                 } else {
600                     if (processTransportWorkReceiver(delivery, cast(ReceiverImpl) link)) {
601                         delivery = delivery.clearTransportWork();
602                     } else {
603                         delivery = delivery.getTransportWorkNext();
604                     }
605                 }
606             }
607         }
608     }
609 
610     private bool processTransportWorkSender(DeliveryImpl delivery,
611                                                SenderImpl snd)
612     {
613         TransportSender tpLink = snd.getTransportLink();
614         SessionImpl session = snd.getSession();
615         TransportSession tpSession = session.getTransportSession();
616 
617         bool wasDone = delivery.isDone();
618 
619         if(!delivery.isDone() &&
620            (delivery.getDataLength() > 0 || delivery != snd.current()) &&
621            tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
622            tpSession.isLocalChannelSet() &&
623            tpLink.getLocalHandle() !is null && !_frameWriter.isFull())
624         {
625             DeliveryImpl inProgress = tpLink.getInProgressDelivery();
626             if(inProgress !is null){
627                 // There is an existing Delivery awaiting completion. Check it
628                 // is the same Delivery object given and return if not, as we
629                 // can't interleave Transfer frames for deliveries on a link.
630                 if(inProgress != delivery) {
631                     return false;
632                 }
633             }
634 
635             TransportDelivery tpDelivery = delivery.getTransportDelivery();
636             UnsignedInteger deliveryId;
637             if (tpDelivery !is null) {
638                 deliveryId = tpDelivery.getDeliveryId();
639             } else {
640                 deliveryId = tpSession.getOutgoingDeliveryId();
641                 tpSession.incrementOutgoingDeliveryId();
642             }
643             tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
644             delivery.setTransportDelivery(tpDelivery);
645 
646             cachedTransfer.setDeliveryId(deliveryId);
647             cachedTransfer.setDeliveryTag(new Binary(delivery.getTag()));
648             cachedTransfer.setHandle(tpLink.getLocalHandle());
649             cachedTransfer.setRcvSettleMode(null);
650             cachedTransfer.setResume(Boolean.FALSE); // Ensure default is written
651             cachedTransfer.setAborted(Boolean.FALSE); // Ensure default is written
652             cachedTransfer.setBatchable(Boolean.FALSE); // Ensure default is written
653 
654             if(delivery.getLocalState() !is null)
655             {
656                 cachedTransfer.setState(delivery.getLocalState());
657             }
658             else
659             {
660                 cachedTransfer.setState(null);
661             }
662 
663             if(delivery.isSettled())
664             {
665                 cachedTransfer.setSettled(Boolean.TRUE);
666             }
667             else
668             {
669                 cachedTransfer.setSettled(Boolean.FALSE);
670                 tpSession.addUnsettledOutgoing(deliveryId, delivery);
671             }
672 
673             if(snd.current() == delivery)
674             {
675                 cachedTransfer.setMore(Boolean.TRUE);
676             }
677             else
678             {
679                 // Partial transfers will reset this as needed to true in the FrameWriter
680                 cachedTransfer.setMore(Boolean.FALSE);
681             }
682 
683             int messageFormat = delivery.getMessageFormat();
684             if(messageFormat == DeliveryImpl.DEFAULT_MESSAGE_FORMAT) {
685                 cachedTransfer.setMessageFormat(UnsignedInteger.ZERO);
686             } else {
687                 cachedTransfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
688             }
689 
690             ReadableBuffer payload = delivery.getData();
691 
692             int pending = payload.remaining();
693             //logInfo("pending !!! %d",pending);
694 
695             try {
696                 writeFrame(tpSession.getLocalChannel(), cachedTransfer, payload, new class Runnable {
697                     void run()
698                     {
699                         cachedTransfer.setMore(Boolean.TRUE);
700                     }
701                 });
702             } finally {
703                 delivery.afterSend();  // Allow for freeing resources after write of buffered data
704             }
705 
706             tpSession.incrementOutgoingId();
707             tpSession.decrementRemoteIncomingWindow();
708 
709             if (payload is null || !payload.hasRemaining())
710             {
711                 session.incrementOutgoingBytes(-pending);
712 
713                 if (!cachedTransfer.getMore().booleanValue) {
714                     // Clear the in-progress delivery marker
715                     tpLink.setInProgressDelivery(null);
716 
717                     delivery.setDone();
718                     tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
719                     tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
720                     session.incrementOutgoingDeliveries(-1);
721                     snd.decrementQueued();
722                 }
723             }
724             else
725             {
726                 session.incrementOutgoingBytes(-(pending - payload.remaining()));
727 
728                 // Remember the delivery we are still processing
729                 // the body transfer frames for
730                 tpLink.setInProgressDelivery(delivery);
731             }
732 
733             if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) {
734                 getConnectionImpl().put(Type.LINK_FLOW, snd);
735             }
736         }
737 
738         if(wasDone && delivery.getLocalState() !is null)
739         {
740             TransportDelivery tpDelivery = delivery.getTransportDelivery();
741             // Use cached object as holder of data for immediate write to the FrameWriter
742             cachedDisposition.setFirst(tpDelivery.getDeliveryId());
743             cachedDisposition.setLast(tpDelivery.getDeliveryId());
744             cachedDisposition.setRole(Role.SENDER);
745             cachedDisposition.setSettled(new Boolean(delivery.isSettled()));
746             cachedDisposition.setBatchable(Boolean.FALSE);  // Enforce default is written
747             if(delivery.isSettled())
748             {
749                 tpDelivery.settled();
750             }
751             cachedDisposition.setState(delivery.getLocalState());
752 
753             writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
754         }
755 
756         return !delivery.isBuffered();
757     }
758 
759     private bool processTransportWorkReceiver(DeliveryImpl delivery, ReceiverImpl rcv)
760     {
761         TransportDelivery tpDelivery = delivery.getTransportDelivery();
762         SessionImpl session = rcv.getSession();
763         TransportSession tpSession = session.getTransportSession();
764 
765         if (tpSession.isLocalChannelSet())
766         {
767             bool settled = delivery.isSettled();
768             DeliveryState localState = delivery.getLocalState();
769             // Use cached object as holder of data for immediate write to the FrameWriter
770             cachedDisposition.setFirst(tpDelivery.getDeliveryId());
771             cachedDisposition.setLast(tpDelivery.getDeliveryId());
772             cachedDisposition.setRole(Role.RECEIVER);
773             cachedDisposition.setSettled(new Boolean(settled));
774             cachedDisposition.setState(localState);
775             cachedDisposition.setBatchable(new Boolean(false));  // Enforce default is written
776 
777             if(localState is null && settled) {
778                 cachedDisposition.setState(delivery.getDefaultDeliveryState());
779             }
780 
781             writeFrame(tpSession.getLocalChannel(), cachedDisposition, null, null);
782             if (settled)
783             {
784                 tpDelivery.settled();
785             }
786             return true;
787         }
788 
789         return false;
790     }
791 
792     private void processReceiverFlow()
793     {
794        // logInfo("processReceiverFlow out ------------------------");
795         if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent)
796         {
797             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
798             while(endpoint !is null)
799             {
800                  ReceiverImpl receiver = cast(ReceiverImpl) endpoint;
801                 if(receiver !is null)
802                 {
803                     TransportLink transportLink = getTransportState(receiver);
804                     TransportSession transportSession = getTransportState(receiver.getSession());
805 
806                     if(receiver.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet() && !receiver.detached())
807                     {
808                         int credits = receiver.clearUnsentCredits();
809                         //logInfo("processReceiverFlow in ------------------------");
810                         if(credits != 0 || receiver.getDrain() ||
811                            transportSession.getIncomingWindowSize() == (UnsignedInteger.ZERO))
812                         {
813                             //logInfo("processReceiverFlow in in ------------------------");
814                             transportLink.addCredit(credits);
815                             writeFlow(transportSession, transportLink);
816                         }
817                     }
818                 }
819                 endpoint = endpoint.transportNext();
820             }
821             endpoint = _connectionEndpoint.getTransportHead();
822             while(endpoint !is null)
823             {
824                 SessionImpl session = cast(SessionImpl) endpoint;
825                 if(session !is null)
826                 {
827                     TransportSession transportSession = getTransportState(session);
828                    // logInfo("processReceiverFlow in ------------------------");
829                     if(session.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet())
830                     {
831                         if(transportSession.getIncomingWindowSize()==(UnsignedInteger.ZERO))
832                         {
833                            // logInfo("processReceiverFlow in in------------------------");
834                             writeFlow(transportSession, null);
835                         }
836                     }
837                 }
838                 endpoint = endpoint.transportNext();
839             }
840         }
841     }
842 
843     private void processAttach()
844     {
845         if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent)
846         {
847             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
848 
849             while(endpoint !is null)
850             {
851                 LinkImpl link = cast(LinkImpl) endpoint;
852                 if(link !is null)
853                 {
854                     TransportLink transportLink = getTransportState(link);
855                     SessionImpl session = link.getSession();
856                     TransportSession transportSession = getTransportState(session);
857                     if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent() && transportSession.isLocalChannelSet())
858                     {
859 
860                         if( (link.getRemoteState() == EndpointState.ACTIVE
861                             && !transportLink.isLocalHandleSet()) || link.getRemoteState() == EndpointState.UNINITIALIZED)
862                         {
863 
864                             UnsignedInteger localHandle = transportSession.allocateLocalHandle(transportLink);
865 
866                             if(link.getRemoteState() == EndpointState.UNINITIALIZED)
867                             {
868                                // logInfo("processAttach UNINITIALIZED ...........................");
869                                 transportSession.addHalfOpenLink(transportLink);
870                             }
871 
872                             Attach attach = new Attach();
873                             attach.setHandle(localHandle);
874                             attach.setName(transportLink.getName() is null ? null : new String(transportLink.getName()));
875 
876                             if(link.getSenderSettleMode() !is null)
877                             {
878                                 //logInfo("processAttach getSenderSettleMode ......................... %d ",link.getSenderSettleMode().getEnum);
879                                 attach.setSndSettleMode(link.getSenderSettleMode());
880                             }
881 
882                             if(link.getReceiverSettleMode() !is null)
883                             {
884                                 //logInfo("processAttach getReceiverSettleMode ......................... %d ",link.getReceiverSettleMode().getEnum);
885                                 attach.setRcvSettleMode(link.getReceiverSettleMode());
886                             }
887 
888                             if(link.getSource() !is null)
889                             {
890                                attach.setSource(link.getSource());
891                             }
892 
893                             if(link.getTarget() !is null)
894                             {
895                                 attach.setTarget(link.getTarget());
896                             }
897 
898                             if(link.getProperties() !is null)
899                             {
900                                 //attach.setProperties(link.getProperties());
901                             }
902 
903                             if(link.getOfferedCapabilities() !is null)
904                             {
905                                // attach.setOfferedCapabilities(new ArrayList!Symbol(link.getOfferedCapabilities()));
906                             }
907 
908                             if(link.getDesiredCapabilities() !is null)
909                             {
910                                // attach.setDesiredCapabilities(new ArrayList!Symbol (link.getDesiredCapabilities()));
911                             }
912 
913                             if(link.getMaxMessageSize() !is null)
914                             {
915                                // logInfo("processAttach getMaxMessageSize ......................... %d ",link.getMaxMessageSize().intValue);
916                                 attach.setMaxMessageSize(link.getMaxMessageSize());
917                             }
918 
919                             attach.setRole(cast(ReceiverImpl)endpoint !is null ? Role.RECEIVER : Role.SENDER);
920 
921                             if(cast(SenderImpl)link !is null)
922                             {
923                                 //logInfo("processAttach setInitialDeliveryCount ......................... ");
924                                 attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
925                             }
926 
927                             writeFrame(transportSession.getLocalChannel(), attach, null, null);
928                             transportLink.sentAttach();
929                         }
930                     }
931                 }
932                 endpoint = endpoint.transportNext();
933             }
934         }
935     }
936 
937     private void processHeader()
938     {
939        // logInfo("processHeader  out ------------------------");
940         if(!_headerWritten)
941         {
942            // logInfo("processHeader  in ------------------------");
943             outputHeaderDescription();
944             _frameWriter.writeHeader(AmqpHeader.HEADER);
945             _headerWritten = true;
946         }
947     }
948 
949     private void outputHeaderDescription()
950     {
951         if (isFrameTracingEnabled())
952         {
953             log(TransportImpl.OUTGOING, new String(HEADER_DESCRIPTION));
954 
955             ProtocolTracer tracer = getProtocolTracer();
956             if (tracer !is null)
957             {
958                 tracer.sentHeader(HEADER_DESCRIPTION);
959             }
960         }
961     }
962 
963     private void processOpen()
964     {
965        // logInfo("processOpen  out -------------------------------");
966         if (!_isOpenSent && (_conditionSet ||
967              (_connectionEndpoint !is null &&
968               _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)))
969         {
970            // logInfo("processOpen  in -------------------------------");
971             Open open = new Open();
972             if (_connectionEndpoint !is null) {
973                // logInfo("processOpen  in in-------------------------------");
974                 string cid = _connectionEndpoint.getLocalContainerId();
975                 open.setContainerId( new String( cid is null ? "" : cid));
976                 open.setHostname(_connectionEndpoint.getHostname()is null ? null : new String(_connectionEndpoint.getHostname()));
977                 //open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities().length == 0? null : new ArrayList!Symbol(_connectionEndpoint.getDesiredCapabilities()));
978                 //open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities().length == 0? null :new ArrayList!Symbol(_connectionEndpoint.getOfferedCapabilities()));
979                 //open.setProperties(_connectionEndpoint.getProperties());
980                 open.setDesiredCapabilities(null);
981                 open.setOfferedCapabilities(null);
982                 open.setProperties(null);
983             } else {
984                 open.setContainerId(new String(""));
985             }
986 
987             if (_maxFrameSize > 0) {
988                 open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
989             }
990             if (_channelMax > 0) {
991                 open.setChannelMax(UnsignedShort.valueOf(cast(short) _channelMax));
992             }
993 
994             // as per the recommendation in the spec, advertise half our
995             // actual timeout to the remote
996             if (_localIdleTimeout > 0) {
997                 open.setIdleTimeOut(new UnsignedInteger(_localIdleTimeout / 2));
998             }
999             _isOpenSent = true;
1000 
1001             writeFrame(0, open, null, null);
1002         }
1003     }
1004 
1005     private void processBegin()
1006     {
1007        // logInfo("processBegin out -----------------------------");
1008         if(_connectionEndpoint !is null && _isOpenSent && !_isCloseSent)
1009         {
1010             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
1011             while(endpoint !is null)
1012             {
1013                 SessionImpl session = cast(SessionImpl) endpoint;
1014                 if(session !is null)
1015                 {
1016                     TransportSession transportSession = getTransportState(session);
1017                     if(session.getLocalState() != EndpointState.UNINITIALIZED && !transportSession.beginSent())
1018                     {
1019                         int channelId = allocateLocalChannel(transportSession);
1020                         Begin begin = new Begin();
1021 
1022                         if(session.getRemoteState() != EndpointState.UNINITIALIZED)
1023                         {
1024                            // logInfo("processBegin in in-----------------------------");
1025                             begin.setRemoteChannel(UnsignedShort.valueOf(cast(short) transportSession.getRemoteChannel()));
1026                         }
1027                        // logInfo("processBegin in -----------------------------");
1028                         transportSession.updateIncomingWindow();
1029 
1030                         begin.setHandleMax(transportSession.getHandleMax());
1031                         begin.setIncomingWindow(transportSession.getIncomingWindowSize());
1032                         begin.setOutgoingWindow(transportSession.getOutgoingWindowSize());
1033                         //logInfo("getOutgoingWindowSize :%d",transportSession.getOutgoingWindowSize().intValue);
1034                         begin.setNextOutgoingId(transportSession.getNextOutgoingId());
1035 
1036                         if(session.getProperties() !is null)
1037                         {
1038                            // begin.setProperties(session.getProperties());
1039                         }
1040 
1041                         if(session.getOfferedCapabilities() !is null)
1042                         {
1043                             //begin.setOfferedCapabilities(new ArrayList!Symbol(session.getOfferedCapabilities()));
1044                         }
1045 
1046                         if(session.getDesiredCapabilities() !is null)
1047                         {
1048                            // begin.setDesiredCapabilities(new ArrayList!Symbol (session.getDesiredCapabilities()));
1049                         }
1050 
1051                         writeFrame(channelId, begin, null, null);
1052                         transportSession.sentBegin();
1053                     }
1054                 }
1055                 endpoint = endpoint.transportNext();
1056             }
1057         }
1058     }
1059 
1060     private TransportSession getTransportState(SessionImpl session)
1061     {
1062         TransportSession transportSession = session.getTransportSession();
1063         if(transportSession is null)
1064         {
1065             transportSession = new TransportSession(this, session);
1066             session.setTransportSession(transportSession);
1067         }
1068         return transportSession;
1069     }
1070 
1071     private TransportLink getTransportState(LinkImpl link)
1072     {
1073         TransportLink transportLink = link.getTransportLink();
1074         if(transportLink is null)
1075         {
1076             transportLink = TransportLink.createTransportLink(link);
1077         }
1078         return transportLink;
1079     }
1080 
1081     private int allocateLocalChannel(TransportSession transportSession)
1082     {
1083         for (int i = 0; i < _connectionEndpoint.getMaxChannels(); i++)
1084         {
1085             if (!_localSessions.containsKey(i))
1086             {
1087                 _localSessions.put(i, transportSession);
1088                 transportSession.setLocalChannel(i);
1089                 return i;
1090             }
1091         }
1092 
1093         return -1;
1094     }
1095 
1096     private int freeLocalChannel(TransportSession transportSession)
1097     {
1098         int channel = transportSession.getLocalChannel();
1099         _localSessions.remove(channel);
1100         transportSession.freeLocalChannel();
1101         return channel;
1102     }
1103 
1104     private void processEnd()
1105     {
1106        // logInfo("processEnd out -----------------------------");
1107         if(_connectionEndpoint !is null && _isOpenSent)
1108         {
1109             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
1110             while(endpoint !is null)
1111             {
1112                 SessionImpl session;
1113                 TransportSession transportSession;
1114 
1115                 if((cast(SessionImpl)endpoint !is null)) {
1116                     if ((session = cast(SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
1117                         && (transportSession = session.getTransportSession()).isLocalChannelSet()
1118                         && !_isCloseSent)
1119                     {
1120                         if (hasSendableMessages(session)) {
1121                             endpoint = endpoint.transportNext();
1122                             continue;
1123                         }
1124 
1125                         int channel = freeLocalChannel(transportSession);
1126                         End end = new End();
1127                         ErrorCondition localError = endpoint.getCondition();
1128                         if( localError.getCondition() !is null )
1129                         {
1130                             end.setError(localError);
1131                         }
1132 
1133                         writeFrame(channel, end, null, null);
1134                     }
1135                    // logInfo("processEnd in -----------------------------");
1136                     endpoint.clearModified();
1137                 }
1138 
1139                 endpoint = endpoint.transportNext();
1140             }
1141         }
1142     }
1143 
1144     private bool hasSendableMessages(SessionImpl session)
1145     {
1146         if (_connectionEndpoint is null) {
1147             return false;
1148         }
1149 
1150         if(!_closeReceived && (session is null || !session.getTransportSession().endReceived()))
1151         {
1152             EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
1153             while(endpoint !is null)
1154             {
1155                 SenderImpl sender = cast(SenderImpl) endpoint;
1156                 if(sender !is null)
1157                 {
1158                     if((session is null || sender.getSession() == session)
1159                        && sender.getQueued() != 0
1160                         && !getTransportState(sender).detachReceived())
1161                     {
1162                         return true;
1163                     }
1164                 }
1165                 endpoint = endpoint.transportNext();
1166             }
1167         }
1168         return false;
1169     }
1170 
1171     private void processClose()
1172     {
1173         version(HUNT_DEDBUG) info("Closing...");
1174         if ((_conditionSet ||
1175              (_connectionEndpoint !is null &&
1176               _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
1177             !_isCloseSent) {
1178             if(!hasSendableMessages(null))
1179             {
1180                 Close close = new Close();
1181                 ErrorCondition localError;
1182 
1183                 if (_connectionEndpoint is null) {
1184                     localError = getCondition();
1185                 } else {
1186                     localError =  _connectionEndpoint.getCondition();
1187                 }
1188 
1189                 if(localError !is null && localError.getCondition() !is null)
1190                 {
1191                     close.setError(localError);
1192                 }
1193 
1194                 _isCloseSent = true;
1195 
1196                 writeFrame(0, close, null, null);
1197 
1198                 if (_connectionEndpoint !is null) {
1199                     _connectionEndpoint.clearModified();
1200                 }
1201             }
1202         }
1203     }
1204 
1205     protected void writeFrame(int channel, FrameBody frameBody,
1206                               ReadableBuffer payload, Runnable onPayloadTooLarge)
1207     {
1208         _frameWriter.writeFrame(channel, cast(Object)frameBody, payload, onPayloadTooLarge);
1209     }
1210 
1211     //==================================================================================================================
1212 
1213     override
1214     public ConnectionImpl getConnectionImpl()
1215     {
1216         return _connectionEndpoint;
1217     }
1218 
1219     override
1220     void postFinal() {}
1221 
1222     override
1223     void doFree() { }
1224 
1225     //==================================================================================================================
1226     // handle incoming amqp data
1227 
1228 
1229     public void handleOpen(Open open, Binary payload, int channel)
1230     {
1231         setRemoteState(EndpointState.ACTIVE);
1232         if(_connectionEndpoint !is null)
1233         {
1234             _connectionEndpoint.handleOpen(open);
1235         }
1236         else
1237         {
1238             _open = open;
1239         }
1240 
1241         int effectiveMaxFrameSize = _remoteMaxFrameSize;
1242         if(open.getMaxFrameSize().longValue() > 0)
1243         {
1244             _remoteMaxFrameSize = cast(int) open.getMaxFrameSize().longValue();
1245             effectiveMaxFrameSize = cast(int) min(open.getMaxFrameSize().longValue(), Integer.MAX_VALUE);
1246         }
1247 
1248         if(_outboundFrameSizeLimit > 0) {
1249             effectiveMaxFrameSize = cast(int) min(open.getMaxFrameSize().longValue(), _outboundFrameSizeLimit);
1250         }
1251 
1252         _frameWriter.setMaxFrameSize(effectiveMaxFrameSize);
1253 
1254         if (open.getChannelMax().longValue() > 0)
1255         {
1256             _remoteChannelMax = cast(int) open.getChannelMax().longValue();
1257         }
1258 
1259         if (open.getIdleTimeOut() !is null && open.getIdleTimeOut().longValue() > 0)
1260         {
1261             _remoteIdleTimeout = open.getIdleTimeOut().intValue();
1262         }
1263     }
1264 
1265     public void handleBegin(Begin begin, Binary payload, int channel)
1266     {
1267         // TODO - check channel < max_channel
1268         TransportSession transportSession = _remoteSessions.get(channel);
1269         if(transportSession !is null)
1270         {
1271             // TODO - fail due to begin on begun session
1272         }
1273         else
1274         {
1275             SessionImpl session;
1276             if(begin.getRemoteChannel() is null)
1277             {
1278                 session = _connectionEndpoint.session();
1279                 transportSession = getTransportState(session);
1280             }
1281             else
1282             {
1283                 transportSession = _localSessions.get(begin.getRemoteChannel().intValue());
1284                 if (transportSession is null) {
1285                     // TODO handle failure rather than just throwing a nicer NPE
1286                     throw new NullPointerException("uncorrelated channel: ");
1287                 }
1288                 session = transportSession.getSession();
1289 
1290             }
1291             transportSession.setRemoteChannel(channel);
1292             session.setRemoteState(EndpointState.ACTIVE);
1293             transportSession.setNextIncomingId(begin.getNextOutgoingId());
1294             session.setRemoteProperties(begin.getProperties());
1295             if (begin.getDesiredCapabilities() !is null)
1296             {
1297                 session.setRemoteDesiredCapabilities(begin.getDesiredCapabilities().toArray());
1298             }
1299             if (begin.getOfferedCapabilities() !is null)
1300             {
1301                 session.setRemoteOfferedCapabilities(begin.getOfferedCapabilities().toArray());
1302             }
1303             _remoteSessions.put(channel, transportSession);
1304 
1305             _connectionEndpoint.put(Type.SESSION_REMOTE_OPEN, session);
1306         }
1307 
1308     }
1309 
1310     public void handleAttach(Attach attach, Binary payload, int channel)
1311     {
1312         TransportSession transportSession = _remoteSessions.get(channel);
1313         if(transportSession is null)
1314         {
1315             // TODO - fail due to attach on non-begun session
1316         }
1317         else
1318         {
1319             SessionImpl session = transportSession.getSession();
1320             UnsignedInteger handle = attach.getHandle();
1321             if (handle > (transportSession.getHandleMax())) {
1322                 // The handle-max value is the highest handle value that can be used on the session. A peer MUST
1323                 // NOT attempt to attach a link using a handle value outside the range that its partner can handle.
1324                 // A peer that receives a handle outside the supported range MUST close the connection with the
1325                 // framing-error error-code.
1326                 ErrorCondition condition =
1327                         new ErrorCondition(ConnectionError.FRAMING_ERROR,
1328                                                             new String("handle-max exceeded"));
1329                 _connectionEndpoint.setCondition(condition);
1330                 _connectionEndpoint.setLocalState(EndpointState.CLOSED);
1331                 if (!_isCloseSent) {
1332                     Close close = new Close();
1333                     close.setError(condition);
1334                     _isCloseSent = true;
1335                     writeFrame(0, close, null, null);
1336                 }
1337                 close_tail();
1338                 return;
1339             }
1340             TransportLink transportLink = transportSession.getLinkFromRemoteHandle(handle);
1341             LinkImpl link = null;
1342 
1343             if(transportLink !is null)
1344             {
1345                 // TODO - fail - attempt attach on a handle which is in use
1346             }
1347             else
1348             {
1349                 transportLink = transportSession.resolveHalfOpenLink(cast(string)(attach.getName().getBytes()));
1350                 if(transportLink is null)
1351                 {
1352 
1353                     link = (attach.getRole() == Role.RECEIVER)
1354                            ? session.sender(cast(string)(attach.getName().getBytes()))
1355                            : session.receiver(cast(string)(attach.getName().getBytes()));
1356                     transportLink = getTransportState(link);
1357                 }
1358                 else
1359                 {
1360                     link = cast(LinkImpl)(transportLink.getLink());
1361                 }
1362                 if(attach.getRole() == Role.SENDER)
1363                 {
1364                     transportLink.setDeliveryCount(attach.getInitialDeliveryCount());
1365                 }
1366 
1367                 link.setRemoteState(EndpointState.ACTIVE);
1368                 link.setRemoteSource(attach.getSource());
1369                 link.setRemoteTarget(attach.getTarget());
1370 
1371                 link.setRemoteReceiverSettleMode(attach.getRcvSettleMode());
1372                 link.setRemoteSenderSettleMode(attach.getSndSettleMode());
1373 
1374                 link.setRemoteProperties(attach.getProperties());
1375 
1376                 if (attach.getDesiredCapabilities() !is null)
1377                 {
1378                     link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities().toArray());
1379                 }
1380                 if (attach.getOfferedCapabilities() !is null)
1381                 {
1382                     link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities().toArray());
1383                 }
1384 
1385 
1386                 link.setRemoteMaxMessageSize(attach.getMaxMessageSize());
1387 
1388                 transportLink.setName(cast(string)(attach.getName().getBytes()));
1389                 transportLink.setRemoteHandle(handle);
1390                 transportSession.addLinkRemoteHandle(transportLink, handle);
1391 
1392             }
1393 
1394             _connectionEndpoint.put(Type.LINK_REMOTE_OPEN, link);
1395         }
1396     }
1397 
1398     public void handleFlow(Flow flow, Binary payload, int channel)
1399     {
1400         TransportSession transportSession = _remoteSessions.get(channel);
1401         if(transportSession is null)
1402         {
1403             // TODO - fail due to attach on non-begun session
1404         }
1405         else
1406         {
1407             transportSession.handleFlow(flow);
1408         }
1409 
1410     }
1411 
1412     public void handleTransfer(Transfer transfer, Binary payload, int channel)
1413     {
1414         // TODO - check channel < max_channel
1415         TransportSession transportSession = _remoteSessions.get(channel);
1416         if(transportSession !is null)
1417         {
1418             transportSession.handleTransfer(transfer, payload);
1419         }
1420         else
1421         {
1422             // TODO - fail due to begin on begun session
1423         }
1424     }
1425 
1426     public void handleDisposition(Disposition disposition, Binary payload, int channel)
1427     {
1428         TransportSession transportSession = _remoteSessions.get(channel);
1429         if(transportSession is null)
1430         {
1431             // TODO - fail due to attach on non-begun session
1432         }
1433         else
1434         {
1435             transportSession.handleDisposition(disposition);
1436         }
1437     }
1438 
1439     public void handleDetach(Detach detach, Binary payload, int channel)
1440     {
1441         TransportSession transportSession = _remoteSessions.get(channel);
1442         if(transportSession is null)
1443         {
1444             // TODO - fail due to attach on non-begun session
1445         }
1446         else
1447         {
1448             TransportLink transportLink = transportSession.getLinkFromRemoteHandle(detach.getHandle());
1449 
1450             if(transportLink !is null)
1451             {
1452                 LinkImpl link = cast(LinkImpl)(transportLink.getLink());
1453                 transportLink.receivedDetach();
1454                 transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
1455 
1456                 Boolean b = detach.getClosed();
1457                 if (b !is null && b.booleanValue()) {
1458                     _connectionEndpoint.put(Type.LINK_REMOTE_CLOSE, link);
1459                 } else {
1460                     _connectionEndpoint.put(Type.LINK_REMOTE_DETACH, link);
1461                 }
1462                 transportLink.clearRemoteHandle();
1463                 link.setRemoteState(EndpointState.CLOSED);
1464                 if(detach.getError() !is null)
1465                 {
1466                     link.getRemoteCondition().copyFrom(detach.getError());
1467                 }
1468             }
1469             else
1470             {
1471                 // TODO - fail - attempt attach on a handle which is in use
1472             }
1473         }
1474     }
1475 
1476      void invoke(FrameBodyHandler!int handler, Binary payload, int context)
1477      {
1478          implementationMissing(false);
1479      }
1480 
1481     public void handleEnd(End end, Binary payload, int channel)
1482     {
1483         TransportSession transportSession = _remoteSessions.get(channel);
1484         if(transportSession is null)
1485         {
1486             // TODO - fail due to attach on non-begun session
1487         }
1488         else
1489         {
1490             _remoteSessions.remove(channel);
1491             transportSession.receivedEnd();
1492             transportSession.unsetRemoteChannel();
1493             SessionImpl session = transportSession.getSession();
1494             session.setRemoteState(EndpointState.CLOSED);
1495             ErrorCondition errorCondition = end.getError();
1496             if(errorCondition !is null)
1497             {
1498                 session.getRemoteCondition().copyFrom(errorCondition);
1499             }
1500 
1501             _connectionEndpoint.put(Type.SESSION_REMOTE_CLOSE, session);
1502         }
1503     }
1504 
1505     public void handleClose(Close close, Binary payload, int channel)
1506     {
1507         _closeReceived = true;
1508         _remoteIdleTimeout = 0;
1509         setRemoteState(EndpointState.CLOSED);
1510         if(_connectionEndpoint !is null)
1511         {
1512             _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
1513             if(close.getError() !is null)
1514             {
1515                 _connectionEndpoint.getRemoteCondition().copyFrom(close.getError());
1516             }
1517 
1518             _connectionEndpoint.put(Type.CONNECTION_REMOTE_CLOSE, _connectionEndpoint);
1519         }
1520 
1521     }
1522 
1523     override
1524     public bool handleFrame(TransportFrame frame)
1525     {
1526         if (!isHandlingFrames())
1527         {
1528             throw new IllegalStateException("Transport cannot accept frame: ");
1529         }
1530 
1531         log(INCOMING, frame);
1532 
1533         ProtocolTracer tracer = _protocolTracer.get();
1534         if( tracer !is null )
1535         {
1536             tracer.receivedFrame(frame);
1537         }
1538 
1539         //import hunt.proton.amqp.transport.Attach;
1540         //import hunt.proton.amqp.transport.Open;
1541         //import hunt.proton.amqp.transport.Begin;
1542         //import hunt.proton.amqp.transport.Detach;
1543         //import hunt.proton.amqp.transport.Close;
1544         //import hunt.proton.amqp.transport.Flow;
1545         //import hunt.proton.amqp.transport.End;
1546         //import hunt.proton.amqp.transport.Transfer;
1547         //import hunt.proton.amqp.transport.EmptyFrame;
1548         //import hunt.proton.amqp.transport.Disposition;
1549         Attach attach = cast(Attach)frame.getBody();
1550         if (attach !is null)
1551         {
1552             attach.invoke(this,frame.getPayload(), frame.getChannel());
1553             return _closeReceived;
1554         }
1555         Open open = cast(Open)frame.getBody();
1556         if (open !is null)
1557         {
1558             open.invoke(this,frame.getPayload(), frame.getChannel());
1559             return _closeReceived;
1560         }
1561         Begin begin = cast(Begin)frame.getBody();
1562         if (begin !is null)
1563         {
1564             begin.invoke(this,frame.getPayload(), frame.getChannel());
1565             return _closeReceived;
1566         }
1567         Detach detach = cast(Detach)frame.getBody();
1568         if (detach !is null)
1569         {
1570             detach.invoke(this,frame.getPayload(), frame.getChannel());
1571             return _closeReceived;
1572         }
1573 
1574         Close close = cast(Close)frame.getBody();
1575         if (close !is null)
1576         {
1577             close.invoke(this,frame.getPayload(), frame.getChannel());
1578             return _closeReceived;
1579         }
1580         Flow flow = cast(Flow)frame.getBody();
1581         if (flow !is null)
1582         {
1583             flow.invoke(this,frame.getPayload(), frame.getChannel());
1584             return _closeReceived;
1585         }
1586         End end = cast(End)frame.getBody();
1587         if (end !is null)
1588         {
1589             end.invoke(this,frame.getPayload(), frame.getChannel());
1590             return _closeReceived;
1591         }
1592         Transfer transfer = cast(Transfer)frame.getBody();
1593         if (transfer !is null)
1594         {
1595             transfer.invoke(this,frame.getPayload(), frame.getChannel());
1596             return _closeReceived;
1597         }
1598         EmptyFrame emptyframe = cast(EmptyFrame)frame.getBody();
1599         if (emptyframe !is null)
1600         {
1601             emptyframe.invoke(this,frame.getPayload(), frame.getChannel());
1602             return _closeReceived;
1603         }
1604         Disposition dispos = cast(Disposition)frame.getBody();
1605         if (dispos !is null)
1606         {
1607             dispos.invoke(this,frame.getPayload(), frame.getChannel());
1608             return _closeReceived;
1609         }
1610       //  (cast(FrameBodyHandler!int)frame.getBody()).invoke(this,frame.getPayload(), frame.getChannel());
1611 
1612         auto b = cast(Object)frame.getBody();
1613         warning("unhandled: ", typeid(b));
1614         return _closeReceived;
1615     }
1616 
1617     void put(Type type, Object context) {
1618         if (_connectionEndpoint !is null) {
1619             _connectionEndpoint.put(type, context);
1620         }
1621     }
1622 
1623     private void maybePostClosed()
1624     {
1625         if (postedHeadClosed && postedTailClosed) {
1626             put(Type.TRANSPORT_CLOSED, this);
1627         }
1628     }
1629 
1630     override
1631     public void closed(TransportException error)
1632     {
1633         if (!_closeReceived || error !is null) {
1634             // Set an error condition, but only if one was not already set
1635             if(!_conditionSet) {
1636                 string description =  error is null ? "connection aborted" : error.toString();
1637                 setCondition(new ErrorCondition(ConnectionError.FRAMING_ERROR, new String( description)));
1638             }
1639 
1640             _head_closed = true;
1641         }
1642 
1643         if (_conditionSet && !postedTransportError) {
1644             put(Type.TRANSPORT_ERROR, this);
1645             postedTransportError = true;
1646         }
1647 
1648         if (!postedTailClosed) {
1649             put(Type.TRANSPORT_TAIL_CLOSED, this);
1650             postedTailClosed = true;
1651             maybePostClosed();
1652         }
1653     }
1654 
1655     override
1656     public bool isHandlingFrames()
1657     {
1658         return _connectionEndpoint !is null || getRemoteState() == EndpointState.UNINITIALIZED;
1659     }
1660 
1661     override
1662     public ProtocolTracer getProtocolTracer()
1663     {
1664         return _protocolTracer.get();
1665     }
1666 
1667     override
1668     public void setProtocolTracer(ProtocolTracer protocolTracer)
1669     {
1670         this._protocolTracer.set(protocolTracer);
1671     }
1672 
1673     override
1674     public ByteBuffer getInputBuffer()
1675     {
1676         return tail();
1677     }
1678 
1679     override
1680     public TransportResult processInput()
1681     {
1682         try {
1683             process();
1684             return TransportResultFactory.ok();
1685         } catch (TransportException e) {
1686             return TransportResultFactory.error(e);
1687         }
1688     }
1689 
1690     override
1691     public ByteBuffer getOutputBuffer()
1692     {
1693         pending();
1694         return head();
1695     }
1696 
1697     override
1698     public void outputConsumed()
1699     {
1700         pop(_outputProcessor.head().position());
1701     }
1702 
1703     override
1704     public int capacity()
1705     {
1706         init();
1707         return _inputProcessor.capacity();
1708     }
1709 
1710     override
1711     public ByteBuffer tail()
1712     {
1713         init();
1714         return _inputProcessor.tail();
1715     }
1716 
1717     override
1718     public void process()
1719     {
1720         _processingStarted = true;
1721 
1722         try {
1723             init();
1724             int beforePosition = _inputProcessor.position();
1725             _inputProcessor.process();
1726             _bytesInput += beforePosition - _inputProcessor.position();
1727         } catch (TransportException e) {
1728             _head_closed = true;
1729         }
1730     }
1731 
1732     override
1733     public void close_tail()
1734     {
1735         init();
1736         _inputProcessor.close_tail();
1737     }
1738 
1739     override
1740     public int pending()
1741     {
1742         init();
1743         return _outputProcessor.pending();
1744     }
1745 
1746     override
1747     public ByteBuffer head()
1748     {
1749         init();
1750         return _outputProcessor.head();
1751     }
1752 
1753     override
1754     public void pop(int bytes)
1755     {
1756         init();
1757         _outputProcessor.pop(bytes);
1758         _bytesOutput += bytes;
1759 
1760         int p = pending();
1761         if (p < 0 && !postedHeadClosed) {
1762             put(Type.TRANSPORT_HEAD_CLOSED, this);
1763             postedHeadClosed = true;
1764             maybePostClosed();
1765         }
1766     }
1767 
1768     override
1769     public void setIdleTimeout(int timeout) {
1770         _localIdleTimeout = timeout;
1771     }
1772 
1773     override
1774     public int getIdleTimeout() {
1775         return _localIdleTimeout;
1776     }
1777 
1778     override
1779     public int getRemoteIdleTimeout() {
1780         return _remoteIdleTimeout;
1781     }
1782 
1783     override
1784     public long tick(long now)
1785     {
1786       long deadline = 0;
1787       synchronized(this){
1788         if (pending() == 0) {
1789           writeFrame( 0, null, null, null);
1790           pending();
1791         }
1792       }
1793       //synchronized(this){
1794       //  if (_localIdleTimeout > 0) {
1795       //    if (_localIdleDeadline == 0 || _lastBytesInput != _bytesInput) {
1796       //      _localIdleDeadline = computeDeadline(now, _localIdleTimeout);
1797       //      _lastBytesInput = _bytesInput;
1798       //    } else if (_localIdleDeadline - now <= 0) {
1799       //      _localIdleDeadline = computeDeadline(now, _localIdleTimeout);
1800       //      if (_connectionEndpoint !is null &&
1801       //      _connectionEndpoint.getLocalState() != EndpointState.CLOSED) {
1802       //        ErrorCondition condition =
1803       //        new ErrorCondition(Symbol.getSymbol("amqp:resource-limit-exceeded"),
1804       //        new String ("local-idle-timeout expired"));
1805       //        _connectionEndpoint.setCondition(condition);
1806       //        _connectionEndpoint.setLocalState(EndpointState.CLOSED);
1807       //
1808       //        if (!_isOpenSent) {
1809       //          if ((_sasl !is null) && (!_sasl.isDone())) {
1810       //            _sasl.fail();
1811       //          }
1812       //          Open open = new Open();
1813       //          _isOpenSent = true;
1814       //          writeFrame(0, open, null, null);
1815       //        }
1816       //        if (!_isCloseSent) {
1817       //          Close close = new Close();
1818       //          close.setError(condition);
1819       //          _isCloseSent = true;
1820       //          writeFrame(0, close, null, null);
1821       //        }
1822       //        close_tail();
1823       //      }
1824       //    }
1825       //    deadline = _localIdleDeadline;
1826       //  }
1827       //
1828       //  if (_remoteIdleTimeout != 0 && !_isCloseSent) {
1829       //    if (_remoteIdleDeadline == 0 || _lastBytesOutput != _bytesOutput) {
1830       //      _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2);
1831       //      _lastBytesOutput = _bytesOutput;
1832       //    } else if (_remoteIdleDeadline - now <= 0) {
1833       //      _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2);
1834       //      if (pending() == 0) {
1835       //        writeFrame(0, null, null, null);
1836       //        _lastBytesOutput += pending();
1837       //      }
1838       //    }
1839       //
1840       //    if(deadline == 0) {
1841       //      deadline = _remoteIdleDeadline;
1842       //    } else {
1843       //      if(_remoteIdleDeadline - _localIdleDeadline <= 0) {
1844       //        deadline = _remoteIdleDeadline;
1845       //      } else {
1846       //        deadline = _localIdleDeadline;
1847       //      }
1848       //    }
1849       //  }
1850       //}
1851       return deadline;
1852     }
1853 
1854     private long computeDeadline(long now, long timeout) {
1855         long deadline = now + timeout;
1856 
1857         // We use 0 to signal not-initialised and/or no-timeout, so in the
1858         // unlikely event thats to be the actual deadline, return 1 instead
1859         return deadline != 0 ? deadline : 1;
1860     }
1861 
1862     override
1863     public long getFramesOutput()
1864     {
1865         return _frameWriter.getFramesOutput();
1866     }
1867 
1868     override
1869     public long getFramesInput()
1870     {
1871         return _frameParser.getFramesInput();
1872     }
1873 
1874     override
1875     public void close_head()
1876     {
1877         _outputProcessor.close_head();
1878     }
1879 
1880     override
1881     public bool isClosed() {
1882         int p = pending();
1883         int c = capacity();
1884         return  p == END_OF_STREAM && c == END_OF_STREAM;
1885     }
1886     override
1887     public string toString()
1888     {
1889         implementationMissing(false);
1890         return "";
1891        // return "TransportImpl [_connectionEndpoint=" ~ _connectionEndpoint ~ ", " ~ super.toString() ~ "]";
1892     }
1893 
1894     /**
1895      * Override the default frame handler. Must be called before the transport starts being used
1896      * (e.g. {@link #getInputBuffer()}, {@link #getOutputBuffer()}, {@link #ssl(SslDomain)} etc).
1897      */
1898     public void setFrameHandler(FrameHandler frameHandler)
1899     {
1900         _frameHandler = frameHandler;
1901     }
1902 
1903     static string INCOMING = "<-";
1904     static string OUTGOING = "->";
1905 
1906     void log(string event, TransportFrame frame)
1907     {
1908         if (isTraceFramesEnabled()) {
1909             outputMessage(event, frame.getChannel(), cast(Object)frame.getBody(), frame.getPayload());
1910         }
1911     }
1912 
1913     void log(string event, SaslFrameBody frameBody) {
1914         if (isTraceFramesEnabled()) {
1915             outputMessage(event, 0, cast(Object)frameBody, null);
1916         }
1917     }
1918 
1919     void log(string event, String headerDescription) {
1920         if (isTraceFramesEnabled()) {
1921             outputMessage(event, 0, headerDescription, null);
1922         }
1923     }
1924 
1925     private void outputMessage(string event, int channel, Object frameBody, Binary payload) {
1926         implementationMissing(false);
1927         //string msg         ;
1928         //
1929         //msg.append("[").append(System.identityHashCode(this)).append(":").append(channel).append("] ");
1930         //msg.append(event).append(" ").append(frameBody);
1931         //if (payload !is null) {
1932         //    msg.append(" (").append(payload.getLength()).append(") ");
1933         //    msg.append(StringUtils.toQuotedString(payload, TRACE_FRAME_PAYLOAD_LENGTH, true));
1934         //}
1935         //
1936         //System.out.println(msg.toString());
1937     }
1938 
1939     bool isFrameTracingEnabled()
1940     {
1941         return (_levels & TRACE_FRM) != 0 || _protocolTracer.get() !is null;
1942     }
1943 
1944     bool isTraceFramesEnabled()
1945     {
1946         return (_levels & TRACE_FRM) != 0;
1947     }
1948 
1949     override
1950     void localOpen() {}
1951 
1952     override
1953     void localClose() {}
1954 
1955     public void setSelectable(Selectable selectable) {
1956         _selectable = selectable;
1957     }
1958 
1959     public Selectable getSelectable() {
1960         return _selectable;
1961     }
1962 
1963     public void setReactor(Reactor reactor) {
1964         _reactor = reactor;
1965     }
1966 
1967     public Reactor getReactor() {
1968         return _reactor;
1969     }
1970 
1971     override
1972     public void setEmitFlowEventOnSend(bool emitFlowEventOnSend)
1973     {
1974         _emitFlowEventOnSend = emitFlowEventOnSend;
1975     }
1976 
1977     override
1978     public bool isEmitFlowEventOnSend()
1979     {
1980         return _emitFlowEventOnSend;
1981     }
1982 
1983     override
1984     public void setUseReadOnlyOutputBuffer(bool value)
1985     {
1986         this._useReadOnlyOutputBuffer = value;
1987     }
1988 
1989     override
1990     public bool isUseReadOnlyOutputBuffer()
1991     {
1992         return _useReadOnlyOutputBuffer;
1993     }
1994 
1995     // From TransportInternal
1996     override
1997     public void addTransportLayer(TransportLayer layer)
1998     {
1999         if (_processingStarted)
2000         {
2001             throw new IllegalStateException("Additional layer can't be added after transport has started processing");
2002         }
2003 
2004         if (_additionalTransportLayers is null)
2005         {
2006             _additionalTransportLayers = new ArrayList!TransportLayer();
2007         }
2008 
2009         if (!_additionalTransportLayers.contains(layer))
2010         {
2011             init();
2012             TransportWrapper transportWrapper = layer.wrap(_inputProcessor, _outputProcessor);
2013             _inputProcessor = transportWrapper;
2014             _outputProcessor = transportWrapper;
2015             _additionalTransportLayers.add(layer);
2016         }
2017     }
2018 
2019     override
2020     public void setOutboundFrameSizeLimit(int limit) {
2021         _outboundFrameSizeLimit = limit;
2022     }
2023 
2024     override
2025     public int getOutboundFrameSizeLimit() {
2026         return _outboundFrameSizeLimit;
2027     }
2028 }