1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.SaslImpl;
13 import hunt.String;
14 import hunt.Exceptions;
15 import  hunt.proton.engine.impl.ByteBufferUtils;
16 import hunt.io.ByteBuffer;
17 import hunt.proton.amqp.Binary;
18 import hunt.proton.amqp.Symbol;
19 import hunt.proton.amqp.security.SaslChallenge;
20 import hunt.proton.amqp.security.SaslCode;
21 import hunt.proton.amqp.security.SaslFrameBody;
22 import hunt.proton.amqp.security.SaslInit;
23 import hunt.proton.amqp.security.SaslMechanisms;
24 import hunt.proton.amqp.security.SaslResponse;
25 import hunt.proton.codec.AMQPDefinedTypes;
26 import hunt.proton.codec.DecoderImpl;
27 import hunt.proton.codec.EncoderImpl;
28 import hunt.proton.engine.Sasl;
29 import hunt.proton.engine.SaslListener;
30 import hunt.proton.engine.Transport;
31 import hunt.proton.engine.TransportException;
32 import hunt.Object;
33 import hunt.logging;
34 import hunt.proton.engine.impl.PlainTransportWrapper;
35 import hunt.proton.engine.impl.SaslSniffer;
36 import hunt.proton.engine.impl.TransportInput;
37 import hunt.proton.engine.impl.TransportOutput;
38 import hunt.proton.engine.impl.SaslFrameHandler;
39 import hunt.proton.engine.impl.TransportLayer;
40 import hunt.proton.engine.impl.TransportImpl;
41 import hunt.proton.engine.impl.FrameWriter;
42 import hunt.proton.engine.impl.SaslFrameParser;
43 import hunt.proton.engine.impl.AmqpHeader;
44 import hunt.proton.engine.impl.ProtocolTracer;
45 import hunt.proton.engine.impl.TransportWrapper;
46 import hunt.collection.ArrayList;
47 import hunt.proton.amqp.security.SaslOutcome;
48 import hunt.logging;
49 import std.conv:to;
50 
51 class SaslImpl : Sasl, SaslFrameBodyHandler!Void, SaslFrameHandler, TransportLayer
52 {
53 
54     static byte SASL_FRAME_TYPE = cast(byte) 1;
55     private static string HEADER_DESCRIPTION = "SASL";
56 
57     private DecoderImpl _decoder ;// = new DecoderImpl();
58     private EncoderImpl _encoder ;//= new EncoderImpl(_decoder);
59 
60     private TransportImpl _transport;
61 
62     private bool _tail_closed = false;
63     private bool _head_closed = false;
64     private int _maxFrameSize;
65     private FrameWriter _frameWriter;
66 
67     private ByteBuffer _pending;
68 
69     private bool _headerWritten;
70     private Binary _challengeResponse;
71     private SaslFrameParser _frameParser;
72     private bool _initReceived;
73     private bool _mechanismsSent;
74     private bool _initSent;
75 
76     enum Role { CLIENT, SERVER }
77 
78     private hunt.proton.engine.Sasl.SaslOutcome _outcome ;// = SaslOutcome.PN_SASL_NONE;
79     private SaslState _state ;//= SaslState.PN_SASL_IDLE;
80 
81     private string _hostname;
82     private bool _done;
83     private Symbol[] _mechanisms;
84 
85     private Symbol _chosenMechanism;
86 
87     private Role _role;
88     private bool _allowSkip = true;
89 
90     private SaslListener _saslListener;
91 
92     /**
93      * @param maxFrameSize the size of the input and output buffers
94      * {@link SaslTransportWrapper#_inputBuffer} and
95      * {@link SaslTransportWrapper#_outputBuffer}.
96      */
97     this(TransportImpl transport, int maxFrameSize)
98     {
99         _role  = Role.CLIENT;
100         _outcome = hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_NONE;
101         _state = SaslState.PN_SASL_IDLE;
102         _transport = transport;
103         _maxFrameSize = maxFrameSize;
104         _decoder = new DecoderImpl();
105         _encoder = new EncoderImpl(_decoder);
106         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
107         _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize, _transport);
108         _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport);
109     }
110 
111     void fail() {
112         if ( _role == Role.CLIENT) {
113             _role = Role.CLIENT;
114             _initSent = true;
115             logInfo("_initSent true !!!!!!!");
116         } else {
117             _initReceived = true;
118 
119         }
120         _done = true;
121         logInfo("_done true !!!!!!!");
122         _outcome = hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_SYS;
123     }
124 
125     override
126     bool isDone()
127     {
128         return _done && (_role==Role.CLIENT || _initReceived);
129     }
130 
131     private void process()
132     {
133         processHeader();
134 
135         if(_role == Role.SERVER)
136         {
137             if(!_mechanismsSent && _mechanisms !is null)
138             {
139                 SaslMechanisms mechanisms = new SaslMechanisms();
140 
141                 mechanisms.setSaslServerMechanisms(new ArrayList!Symbol (_mechanisms));
142                // mechanisms.setSaslServerMechanisms(new ArrayList!Symbol (Symbol.valueOf("ANONYMOUS")));
143                 writeFrame(mechanisms);
144                 _mechanismsSent = true;
145                 _state = SaslState.PN_SASL_STEP;
146             }
147 
148             if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() !is null)
149             {
150                 SaslChallenge challenge = new SaslChallenge();
151                 challenge.setChallenge(getChallengeResponse());
152                 writeFrame(challenge);
153                 setChallengeResponse(null);
154             }
155 
156             if(_done)
157             {
158                 hunt.proton.amqp.security.SaslOutcome.SaslOutcome outcome =
159                         new hunt.proton.amqp.security.SaslOutcome.SaslOutcome();
160                 outcome.setCode(SaslCode.values()[_outcome.getCode()]);
161                 if (_outcome == hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_OK)
162                 {
163                     outcome.setAdditionalData(getChallengeResponse());
164                 }
165                 writeFrame(outcome);
166                 setChallengeResponse(null);
167             }
168         }
169         else if(_role == Role.CLIENT)
170         {
171             if(getState() == SaslState.PN_SASL_IDLE && _chosenMechanism !is null)
172             {
173                 processInit();
174                 _state = SaslState.PN_SASL_STEP;
175 
176                 //HACK: if we received an outcome before
177                 //we sent our init, change the state now
178                 if(_outcome != hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_NONE)
179                 {
180                     _state = classifyStateFromOutcome(_outcome);
181                 }
182             }
183 
184             if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() !is null)
185             {
186                 processResponse();
187             }
188         }
189     }
190 
191     private void writeFrame(SaslFrameBody frameBody)
192     {
193         _frameWriter.writeFrame(cast(Object)frameBody);
194     }
195 
196     override
197     int recv(byte[] bytes, int offset, int size)
198     {
199         if(_pending is null)
200         {
201             return -1;
202         }
203         int written = ByteBufferUtils.pourBufferToArray(_pending, bytes, offset, size);
204         if(!_pending.hasRemaining())
205         {
206             _pending = null;
207         }
208         return written;
209     }
210 
211     override
212     int send(byte[] bytes, int offset, int size)
213     {
214         byte[] data = new byte[size];
215         //System.arraycopy(bytes, offset, data, 0, size);
216         data[0 .. size] = bytes[offset .. offset+size];
217         setChallengeResponse(new Binary(data));
218         return size;
219     }
220 
221     int processHeader()
222     {
223         if(!_headerWritten)
224         {
225             logHeader();
226             _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
227             _headerWritten = true;
228             return cast(int)(AmqpHeader.SASL_HEADER.length);
229         }
230         else
231         {
232             return 0;
233         }
234     }
235 
236     private void logHeader()
237     {
238         if (_transport.isFrameTracingEnabled())
239         {
240             _transport.log(TransportImpl.OUTGOING, new String(HEADER_DESCRIPTION));
241 
242             ProtocolTracer tracer = _transport.getProtocolTracer();
243             if (tracer !is null)
244             {
245                 tracer.sentHeader(HEADER_DESCRIPTION);
246             }
247         }
248     }
249 
250     override
251     int pending()
252     {
253         return _pending is null ? 0 : _pending.remaining();
254     }
255 
256     void setPending(ByteBuffer pending)
257     {
258         _pending = pending;
259     }
260 
261     override
262     SaslState getState()
263     {
264         return _state;
265     }
266 
267     Binary getChallengeResponse()
268     {
269         return _challengeResponse;
270     }
271 
272     void setChallengeResponse(Binary challengeResponse)
273     {
274         _challengeResponse = challengeResponse;
275     }
276 
277     override
278     void setMechanisms(string[] mechanisms)
279     {
280         if(mechanisms !is null && mechanisms.length != 0)
281         {
282             _mechanisms = new Symbol[mechanisms.length];
283             for(int i = 0; i < mechanisms.length; i++)
284             {
285                 _mechanisms[i] = Symbol.valueOf(mechanisms[i]);
286             }
287         }
288 
289         if(_role == Role.CLIENT)
290         {
291            // assert mechanisms !is null;
292            // assert mechanisms.length == 1;
293 
294             _chosenMechanism = Symbol.valueOf(mechanisms[0]);
295         }
296     }
297 
298     override
299     string[] getRemoteMechanisms()
300     {
301         if(_role == Role.SERVER)
302         {
303             return _chosenMechanism is null ? [] : [ _chosenMechanism.toString() ];
304         }
305         else if(_role == Role.CLIENT)
306         {
307             if(_mechanisms is null)
308             {
309                 return [];
310             }
311             else
312             {
313                 string[] remoteMechanisms =  new string[_mechanisms.length];
314                 //string[] remoteMechanisms = ["ANONYMOUS"] ;//= new string[_mechanisms.length];
315                 //logInfo("length : %d",_mechanisms.length);
316                 //if (_mechanisms[0] is null)
317                 //{
318                 //    logInfo("sssssssssssssssssssssssssssssssssssssss");
319                 //}
320                 //logInfo("ccccc %s :",_mechanisms[0].toString);
321                 foreach (Symbol sy; _mechanisms)
322                 {
323                     remoteMechanisms ~= sy.toString;
324                 }
325                 for(int i = 0; i < _mechanisms.length; i++)
326                 {
327                     remoteMechanisms[i] = _mechanisms[i].toString();
328                 }
329                 return remoteMechanisms;
330             }
331         }
332         else
333         {
334             throw new IllegalStateException();
335         }
336     }
337 
338     void setMechanism(Symbol mechanism)
339     {
340         _chosenMechanism = mechanism;
341     }
342 
343     Symbol getChosenMechanism()
344     {
345         return _chosenMechanism;
346     }
347 
348     void setResponse(Binary initialResponse)
349     {
350         setPending(initialResponse.asByteBuffer());
351     }
352 
353     override
354     void handle(SaslFrameBody o, Binary payload)
355     {
356         SaslMechanisms frameBody = cast(SaslMechanisms)o;
357         if (frameBody !is null)
358         {
359             _transport.log(TransportImpl.INCOMING, frameBody);
360 
361             ProtocolTracer tracer = _transport.getProtocolTracer();
362             if( tracer !is null )
363             {
364                 tracer.receivedSaslBody(frameBody);
365             }
366 
367             frameBody.invoke(this, payload, null);
368             return;
369         }
370         hunt.proton.amqp.security.SaslOutcome.SaslOutcome frameBodyOutCome = cast(hunt.proton.amqp.security.SaslOutcome.SaslOutcome)o;
371         if (frameBodyOutCome !is null)
372         {
373             _transport.log(TransportImpl.INCOMING, frameBodyOutCome);
374 
375             ProtocolTracer tracer = _transport.getProtocolTracer();
376             if( tracer !is null )
377             {
378                 tracer.receivedSaslBody(frameBodyOutCome);
379             }
380 
381             frameBodyOutCome.invoke(this, payload, null);
382             return;
383         }
384         SaslInit frameBodyInit = cast(SaslInit)o;
385         if (frameBodyInit !is null)
386         {
387             _transport.log(TransportImpl.INCOMING, frameBodyInit);
388 
389             ProtocolTracer tracer = _transport.getProtocolTracer();
390             if( tracer !is null )
391             {
392                 tracer.receivedSaslBody(frameBodyInit);
393             }
394 
395             frameBodyInit.invoke(this, payload, null);
396             return;
397         }
398 
399         SaslResponse frameBodyResp = cast(SaslResponse)o;
400         if (frameBodyResp !is null)
401         {
402             _transport.log(TransportImpl.INCOMING, frameBodyResp);
403 
404             ProtocolTracer tracer = _transport.getProtocolTracer();
405             if( tracer !is null )
406             {
407                 tracer.receivedSaslBody(frameBodyResp);
408             }
409 
410             frameBodyResp.invoke(this, payload, null);
411             return;
412         }
413         SaslChallenge frameBodyChall = cast(SaslChallenge)o;
414         if (frameBodyChall !is null)
415         {
416             _transport.log(TransportImpl.INCOMING, frameBodyChall);
417 
418             ProtocolTracer tracer = _transport.getProtocolTracer();
419             if( tracer !is null )
420             {
421                 tracer.receivedSaslBody(frameBodyChall);
422             }
423 
424             frameBodyChall.invoke(this, payload, null);
425             return;
426         }
427 
428 
429     }
430 
431     override
432     void handleInit(SaslInit saslInit, Binary payload, Void context)
433     {
434         if(_role  == Role.SERVER)
435         {
436             server();
437         }
438         checkRole(Role.SERVER);
439         _hostname = cast(string)(saslInit.getHostname().getBytes());
440         _chosenMechanism = saslInit.getMechanism();
441         _initReceived = true;
442         if(saslInit.getInitialResponse() !is null)
443         {
444             setPending(saslInit.getInitialResponse().asByteBuffer());
445         }
446 
447         if(_saslListener !is null) {
448             _saslListener.onSaslInit(this, _transport);
449         }
450     }
451 
452     void invoke(SaslFrameBodyHandler!Void handler, Binary payload, Void context)
453     {
454         implementationMissing(false);
455 
456     }
457 
458     override
459     void handleResponse(SaslResponse saslResponse, Binary payload, Void context)
460     {
461         checkRole(Role.SERVER);
462         setPending(saslResponse.getResponse()  is null ? null : saslResponse.getResponse().asByteBuffer());
463 
464         if(_saslListener !is null) {
465             _saslListener.onSaslResponse(this, _transport);
466         }
467     }
468 
469     override
470     void done(hunt.proton.engine.Sasl.SaslOutcome outcome)
471     {
472         checkRole(Role.SERVER);
473         _outcome = outcome;
474         _done = true;
475         logInfo("_done true !!!!!!!");
476         _state = classifyStateFromOutcome(outcome);
477         logInfo("SASL negotiation done");
478 
479     }
480 
481     private void checkRole(Role role)
482     {
483         //if(role != _role)
484         //{
485         //    throw new IllegalStateException("Role is " + to!string(_role) + " but should be " + to!string(role));
486         //}
487     }
488 
489     override
490     void handleMechanisms(SaslMechanisms saslMechanisms, Binary payload, Void context)
491     {
492         if(_role  == Role.CLIENT)
493         {
494             client();
495         }
496         checkRole(Role.CLIENT);
497         if (saslMechanisms.getSaslServerMechanisms() is null)
498         {
499             _mechanisms ~= Symbol.valueOf("ANONYMOUS");
500         }
501         else {
502             _mechanisms = saslMechanisms.getSaslServerMechanisms().toArray();
503         }
504 
505 
506         if(_saslListener !is null) {
507             _saslListener.onSaslMechanisms(this, _transport);
508         }
509     }
510 
511     override
512     void handleChallenge(SaslChallenge saslChallenge, Binary payload, Void context)
513     {
514         checkRole(Role.CLIENT);
515         setPending(saslChallenge.getChallenge()  is null ? null : saslChallenge.getChallenge().asByteBuffer());
516 
517         if(_saslListener !is null) {
518             _saslListener.onSaslChallenge(this, _transport);
519         }
520     }
521 
522     override
523     void handleOutcome(hunt.proton.amqp.security.SaslOutcome.SaslOutcome saslOutcome,
524                               Binary payload,
525                               Void context)
526     {
527         checkRole(Role.CLIENT);
528         foreach(hunt.proton.engine.Sasl.SaslOutcome outcome ; hunt.proton.engine.Sasl.SaslOutcome.values())
529         {
530             setPending(saslOutcome.getAdditionalData()  is null ? null : saslOutcome.getAdditionalData().asByteBuffer());
531             if(outcome.getCode() == saslOutcome.getCode().ordinal())
532             {
533                 _outcome = outcome;
534                 if (_state != SaslState.PN_SASL_IDLE)
535                 {
536                     _state = classifyStateFromOutcome(outcome);
537                 }
538                 break;
539             }
540         }
541         _done = true;
542 
543         //if(_logger.isLoggable(Level.FINE))
544         //{
545         //    _logger.fine("Handled outcome: " + this);
546         //}
547 
548         if(_saslListener !is null) {
549             _saslListener.onSaslOutcome(this, _transport);
550         }
551     }
552 
553     private SaslState classifyStateFromOutcome(hunt.proton.engine.Sasl.SaslOutcome outcome)
554     {
555         return outcome == hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_OK ? SaslState.PN_SASL_PASS : SaslState.PN_SASL_FAIL;
556     }
557 
558     private void processResponse()
559     {
560         SaslResponse response = new SaslResponse();
561         response.setResponse(getChallengeResponse());
562         setChallengeResponse(null);
563         writeFrame(response);
564     }
565 
566     private void processInit()
567     {
568         SaslInit init = new SaslInit();
569 
570         init.setHostname(_hostname is null? null : new String(_hostname));
571         init.setMechanism(_chosenMechanism);
572         if(getChallengeResponse() !is null)
573         {
574             init.setInitialResponse(getChallengeResponse());
575             setChallengeResponse(null);
576         }
577         _initSent = true;
578         writeFrame(init);
579     }
580 
581     void plain(String username, String password)
582     {
583         client();
584         _chosenMechanism = Symbol.valueOf("PLAIN");
585         byte[] usernameBytes = username.getBytes();
586         byte[] passwordBytes = password.getBytes();
587         byte[] data = new byte[usernameBytes.length+passwordBytes.length+2];
588         //System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
589         data[1 .. 1+usernameBytes.length] = usernameBytes[0 .. usernameBytes.length ];
590       //  System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, passwordBytes.length);
591         data[2+usernameBytes.length .. 2+usernameBytes.length+passwordBytes.length] = passwordBytes[0 .. passwordBytes.length];
592         setChallengeResponse(new Binary(data));
593     }
594 
595     override
596     hunt.proton.engine.Sasl.SaslOutcome getOutcome()
597     {
598         return _outcome;
599     }
600 
601     override
602     void client()
603     {
604         _role = Role.CLIENT;
605         if(_mechanisms !is null)
606         {
607             //assert _mechanisms.length == 1;
608 
609             _chosenMechanism = _mechanisms[0];
610         }
611     }
612 
613     override
614     void server()
615     {
616         _role = Role.SERVER;
617     }
618 
619     override
620     void allowSkip(bool allowSkip)
621     {
622         _allowSkip = allowSkip;
623     }
624 
625     override
626     TransportWrapper wrap(TransportInput input, TransportOutput output)
627     {
628         return new class SaslSniffer {
629 
630             this()
631             {
632                 super(new SwitchingSaslTransportWrapper(input, output),new PlainTransportWrapper(output, input));
633             }
634 
635             override
636             protected bool isDeterminationMade() {
637                 if (_role == Role.SERVER && _allowSkip) {
638                     return super.isDeterminationMade();
639                 } else {
640                     _selectedTransportWrapper = _wrapper1;
641                     return true;
642                 }
643             }
644         };
645     }
646 
647     //override
648     //String toString()
649     //{
650     //    StringBuilder builder = new StringBuilder();
651     //    builder
652     //        .append("SaslImpl [_outcome=").append(_outcome)
653     //        .append(", state=").append(_state)
654     //        .append(", done=").append(_done)
655     //        .append(", role=").append(_role)
656     //        .append("]");
657     //    return builder.toString();
658     //}
659 
660     class SaslTransportWrapper : TransportWrapper
661     {
662         private TransportInput _underlyingInput;
663         private TransportOutput _underlyingOutput;
664         private bool _outputComplete;
665 
666         private ByteBuffer _outputBuffer;
667         private ByteBuffer _inputBuffer;
668         private ByteBuffer _head;
669 
670         private SwitchingSaslTransportWrapper _parent;
671 
672         this(SwitchingSaslTransportWrapper parent, TransportInput input, TransportOutput output)
673         {
674             _underlyingInput = input;
675             _underlyingOutput = output;
676 
677             _inputBuffer = ByteBufferUtils.newWriteableBuffer(_maxFrameSize);
678             _outputBuffer = ByteBufferUtils.newWriteableBuffer(_maxFrameSize);
679 
680             _parent = parent;
681 
682             if (_transport.isUseReadOnlyOutputBuffer()) {
683                 _head = _outputBuffer.asReadOnlyBuffer();
684             } else {
685                 _head = _outputBuffer.duplicate();
686             }
687 
688             _head.limit(0);
689         }
690 
691         private void fillOutputBuffer()
692         {
693             if(isOutputInSaslMode())
694             {
695                 writeSaslOutput();
696                 if(_done)
697                 {
698                     _outputComplete = true;
699                 }
700             }
701         }
702 
703         /**
704          * TODO rationalise this method with respect to the other similar checks of _role/_initReceived etc
705          * @see SaslImpl#isDone()
706          */
707         private bool isInputInSaslMode()
708         {
709             return  (_role == Role.CLIENT && !_done) || (_role == Role.SERVER && (!_initReceived || !_done));
710         }
711 
712         private bool isOutputInSaslMode()
713         {
714             return  (_role == Role.CLIENT && (!_done || !_initSent)) || (_role == Role.SERVER && !_outputComplete);
715         }
716 
717         override
718         int capacity()
719         {
720             if (_tail_closed) return Transport.END_OF_STREAM;
721             if (isInputInSaslMode())
722             {
723                 return _inputBuffer.remaining();
724             }
725             else
726             {
727                 return _underlyingInput.capacity();
728             }
729         }
730 
731         override
732         int position()
733         {
734             if (_tail_closed) return Transport.END_OF_STREAM;
735             if (isInputInSaslMode())
736             {
737                 return _inputBuffer.position();
738             }
739             else
740             {
741                 return _underlyingInput.position();
742             }
743         }
744 
745         override
746         ByteBuffer tail()
747         {
748             if (!isInputInSaslMode())
749             {
750                 return _underlyingInput.tail();
751             }
752 
753             return _inputBuffer;
754         }
755 
756         override
757         void process()
758         {
759             _inputBuffer.flip();
760 
761             try
762             {
763                 reallyProcessInput();
764             }
765             finally
766             {
767                 _inputBuffer.compact();
768             }
769         }
770 
771         override
772         void close_tail()
773         {
774             _tail_closed = true;
775             if (isInputInSaslMode()) {
776                 _head_closed = true;
777                 _underlyingInput.close_tail();
778             } else {
779                 _underlyingInput.close_tail();
780             }
781         }
782 
783         private void reallyProcessInput()
784         {
785             if(isInputInSaslMode())
786             {
787                 //if(_logger.isLoggable(Level.FINER))
788                 //{
789                 //    _logger.log(Level.FINER, SaslImpl.this + " about to call input.");
790                 //}
791 
792                 _frameParser.input(_inputBuffer);
793             }
794 
795             if(!isInputInSaslMode())
796             {
797                 //if(_logger.isLoggable(Level.FINER))
798                 //{
799                 //    _logger.log(Level.FINER, SaslImpl.this + " about to call plain input");
800                 //}
801 
802                 if (_inputBuffer.hasRemaining())
803                 {
804                     int bytes = ByteBufferUtils.pourAll(_inputBuffer, _underlyingInput);
805                     if (bytes == Transport.END_OF_STREAM)
806                     {
807                         _tail_closed = true;
808                     }
809 
810                     if (!_inputBuffer.hasRemaining())
811                     {
812                         _parent.switchToNextInput();
813                     }
814                 }
815                 else
816                 {
817                     _parent.switchToNextInput();
818                 }
819 
820                 _underlyingInput.process();
821             }
822         }
823 
824         override
825         int pending()
826         {
827           version(HUNT_AMQP_DEBUG)
828           {
829             // logInfof("isOutputInSaslMode : %d  -----------pos: %d",isOutputInSaslMode(),_outputBuffer.position());
830           }
831             if (isOutputInSaslMode() || _outputBuffer.position() != 0)
832             {
833                 fillOutputBuffer();
834                 _head.limit(_outputBuffer.position());
835 
836                 if (_head_closed && _outputBuffer.position() == 0)
837                 {
838                     return Transport.END_OF_STREAM;
839                 }
840                 else
841                 {
842                     return _outputBuffer.position();
843                 }
844             }
845             else
846             {
847                 _parent.switchToNextOutput();
848                 return _underlyingOutput.pending();
849             }
850         }
851 
852         override
853         ByteBuffer head()
854         {
855             if (isOutputInSaslMode() || _outputBuffer.position() != 0)
856             {
857                 pending();
858                 return _head;
859             }
860             else
861             {
862                 _parent.switchToNextOutput();
863                 return _underlyingOutput.head();
864             }
865         }
866 
867         override
868         void pop(int bytes)
869         {
870             if (isOutputInSaslMode() || _outputBuffer.position() != 0)
871             {
872                 _outputBuffer.flip();
873                 _outputBuffer.position(bytes);
874                 _outputBuffer.compact();
875                 _head.position(0);
876                 _head.limit(_outputBuffer.position());
877             }
878             else
879             {
880                 _parent.switchToNextOutput();
881                 _underlyingOutput.pop(bytes);
882             }
883         }
884 
885         override
886         void close_head()
887         {
888             _parent.switchToNextOutput();
889             _underlyingOutput.close_head();
890         }
891 
892         private void writeSaslOutput()
893         {
894             this.outer.process();
895             _frameWriter.readBytes(_outputBuffer);
896             //logInfo("Finished writing SASL output. Output Buffer : %s", _outputBuffer.getRemaining());
897 
898             //if(_logger.isLoggable(Level.FINER))
899             //{
900             //    _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
901             //}
902         }
903     }
904 
905      class SwitchingSaslTransportWrapper : TransportWrapper {
906 
907         private TransportInput _underlyingInput;
908         private TransportOutput _underlyingOutput;
909 
910         private TransportInput currentInput;
911         private TransportOutput currentOutput;
912 
913         this(TransportInput input, TransportOutput output) {
914             _underlyingInput = input;
915             _underlyingOutput = output;
916 
917             // The wrapper can be GC'd after both current's are switched to next.
918             SaslTransportWrapper saslProcessor = new SaslTransportWrapper(this, input, output);
919 
920             currentInput = saslProcessor;
921             currentOutput = saslProcessor;
922         }
923 
924         int capacity() {
925             return currentInput.capacity();
926         }
927 
928         int position() {
929             return currentInput.position();
930         }
931 
932         ByteBuffer tail()  {
933             return currentInput.tail();
934         }
935 
936         void process()  {
937             currentInput.process();
938         }
939 
940         void close_tail() {
941             currentInput.close_tail();
942         }
943 
944         int pending() {
945             return currentOutput.pending();
946         }
947 
948         ByteBuffer head() {
949             return currentOutput.head();
950         }
951 
952         void pop(int bytes) {
953             currentOutput.pop(bytes);
954         }
955 
956         void close_head() {
957             currentOutput.close_head();
958         }
959 
960         void switchToNextInput() {
961             currentInput = _underlyingInput;
962         }
963 
964         void switchToNextOutput() {
965             currentOutput = _underlyingOutput;
966         }
967     }
968 
969     string getHostname()
970     {
971         //if(_role = Role.CLIENT)
972         //{
973         //    checkRole(Role.SERVER);
974         //}
975 
976         return _hostname;
977     }
978 
979     void setRemoteHostname(string hostname)
980     {
981         //if(_role !is null)
982         //{
983         //    checkRole(Role.CLIENT);
984         //}
985 
986         _hostname = hostname;
987     }
988 
989     override
990     void setListener(SaslListener saslListener) {
991         _saslListener = saslListener;
992     }
993 }