1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.SaslImpl; 13 import hunt.String; 14 import hunt.Exceptions; 15 import hunt.proton.engine.impl.ByteBufferUtils; 16 import hunt.io.ByteBuffer; 17 import hunt.proton.amqp.Binary; 18 import hunt.proton.amqp.Symbol; 19 import hunt.proton.amqp.security.SaslChallenge; 20 import hunt.proton.amqp.security.SaslCode; 21 import hunt.proton.amqp.security.SaslFrameBody; 22 import hunt.proton.amqp.security.SaslInit; 23 import hunt.proton.amqp.security.SaslMechanisms; 24 import hunt.proton.amqp.security.SaslResponse; 25 import hunt.proton.codec.AMQPDefinedTypes; 26 import hunt.proton.codec.DecoderImpl; 27 import hunt.proton.codec.EncoderImpl; 28 import hunt.proton.engine.Sasl; 29 import hunt.proton.engine.SaslListener; 30 import hunt.proton.engine.Transport; 31 import hunt.proton.engine.TransportException; 32 import hunt.Object; 33 import hunt.logging; 34 import hunt.proton.engine.impl.PlainTransportWrapper; 35 import hunt.proton.engine.impl.SaslSniffer; 36 import hunt.proton.engine.impl.TransportInput; 37 import hunt.proton.engine.impl.TransportOutput; 38 import hunt.proton.engine.impl.SaslFrameHandler; 39 import hunt.proton.engine.impl.TransportLayer; 40 import hunt.proton.engine.impl.TransportImpl; 41 import hunt.proton.engine.impl.FrameWriter; 42 import hunt.proton.engine.impl.SaslFrameParser; 43 import hunt.proton.engine.impl.AmqpHeader; 44 import hunt.proton.engine.impl.ProtocolTracer; 45 import hunt.proton.engine.impl.TransportWrapper; 46 import hunt.collection.ArrayList; 47 import hunt.proton.amqp.security.SaslOutcome; 48 import hunt.logging; 49 import std.conv:to; 50 51 class SaslImpl : Sasl, SaslFrameBodyHandler!Void, SaslFrameHandler, TransportLayer 52 { 53 54 static byte SASL_FRAME_TYPE = cast(byte) 1; 55 private static string HEADER_DESCRIPTION = "SASL"; 56 57 private DecoderImpl _decoder ;// = new DecoderImpl(); 58 private EncoderImpl _encoder ;//= new EncoderImpl(_decoder); 59 60 private TransportImpl _transport; 61 62 private bool _tail_closed = false; 63 private bool _head_closed = false; 64 private int _maxFrameSize; 65 private FrameWriter _frameWriter; 66 67 private ByteBuffer _pending; 68 69 private bool _headerWritten; 70 private Binary _challengeResponse; 71 private SaslFrameParser _frameParser; 72 private bool _initReceived; 73 private bool _mechanismsSent; 74 private bool _initSent; 75 76 enum Role { CLIENT, SERVER } 77 78 private hunt.proton.engine.Sasl.SaslOutcome _outcome ;// = SaslOutcome.PN_SASL_NONE; 79 private SaslState _state ;//= SaslState.PN_SASL_IDLE; 80 81 private string _hostname; 82 private bool _done; 83 private Symbol[] _mechanisms; 84 85 private Symbol _chosenMechanism; 86 87 private Role _role; 88 private bool _allowSkip = true; 89 90 private SaslListener _saslListener; 91 92 /** 93 * @param maxFrameSize the size of the input and output buffers 94 * {@link SaslTransportWrapper#_inputBuffer} and 95 * {@link SaslTransportWrapper#_outputBuffer}. 96 */ 97 this(TransportImpl transport, int maxFrameSize) 98 { 99 _role = Role.CLIENT; 100 _outcome = hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_NONE; 101 _state = SaslState.PN_SASL_IDLE; 102 _transport = transport; 103 _maxFrameSize = maxFrameSize; 104 _decoder = new DecoderImpl(); 105 _encoder = new EncoderImpl(_decoder); 106 AMQPDefinedTypes.registerAllTypes(_decoder,_encoder); 107 _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize, _transport); 108 _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport); 109 } 110 111 void fail() { 112 if ( _role == Role.CLIENT) { 113 _role = Role.CLIENT; 114 _initSent = true; 115 logInfo("_initSent true !!!!!!!"); 116 } else { 117 _initReceived = true; 118 119 } 120 _done = true; 121 logInfo("_done true !!!!!!!"); 122 _outcome = hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_SYS; 123 } 124 125 override 126 bool isDone() 127 { 128 return _done && (_role==Role.CLIENT || _initReceived); 129 } 130 131 private void process() 132 { 133 processHeader(); 134 135 if(_role == Role.SERVER) 136 { 137 if(!_mechanismsSent && _mechanisms !is null) 138 { 139 SaslMechanisms mechanisms = new SaslMechanisms(); 140 141 mechanisms.setSaslServerMechanisms(new ArrayList!Symbol (_mechanisms)); 142 // mechanisms.setSaslServerMechanisms(new ArrayList!Symbol (Symbol.valueOf("ANONYMOUS"))); 143 writeFrame(mechanisms); 144 _mechanismsSent = true; 145 _state = SaslState.PN_SASL_STEP; 146 } 147 148 if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() !is null) 149 { 150 SaslChallenge challenge = new SaslChallenge(); 151 challenge.setChallenge(getChallengeResponse()); 152 writeFrame(challenge); 153 setChallengeResponse(null); 154 } 155 156 if(_done) 157 { 158 hunt.proton.amqp.security.SaslOutcome.SaslOutcome outcome = 159 new hunt.proton.amqp.security.SaslOutcome.SaslOutcome(); 160 outcome.setCode(SaslCode.values()[_outcome.getCode()]); 161 if (_outcome == hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_OK) 162 { 163 outcome.setAdditionalData(getChallengeResponse()); 164 } 165 writeFrame(outcome); 166 setChallengeResponse(null); 167 } 168 } 169 else if(_role == Role.CLIENT) 170 { 171 if(getState() == SaslState.PN_SASL_IDLE && _chosenMechanism !is null) 172 { 173 processInit(); 174 _state = SaslState.PN_SASL_STEP; 175 176 //HACK: if we received an outcome before 177 //we sent our init, change the state now 178 if(_outcome != hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_NONE) 179 { 180 _state = classifyStateFromOutcome(_outcome); 181 } 182 } 183 184 if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() !is null) 185 { 186 processResponse(); 187 } 188 } 189 } 190 191 private void writeFrame(SaslFrameBody frameBody) 192 { 193 _frameWriter.writeFrame(cast(Object)frameBody); 194 } 195 196 override 197 int recv(byte[] bytes, int offset, int size) 198 { 199 if(_pending is null) 200 { 201 return -1; 202 } 203 int written = ByteBufferUtils.pourBufferToArray(_pending, bytes, offset, size); 204 if(!_pending.hasRemaining()) 205 { 206 _pending = null; 207 } 208 return written; 209 } 210 211 override 212 int send(byte[] bytes, int offset, int size) 213 { 214 byte[] data = new byte[size]; 215 //System.arraycopy(bytes, offset, data, 0, size); 216 data[0 .. size] = bytes[offset .. offset+size]; 217 setChallengeResponse(new Binary(data)); 218 return size; 219 } 220 221 int processHeader() 222 { 223 if(!_headerWritten) 224 { 225 logHeader(); 226 _frameWriter.writeHeader(AmqpHeader.SASL_HEADER); 227 _headerWritten = true; 228 return cast(int)(AmqpHeader.SASL_HEADER.length); 229 } 230 else 231 { 232 return 0; 233 } 234 } 235 236 private void logHeader() 237 { 238 if (_transport.isFrameTracingEnabled()) 239 { 240 _transport.log(TransportImpl.OUTGOING, new String(HEADER_DESCRIPTION)); 241 242 ProtocolTracer tracer = _transport.getProtocolTracer(); 243 if (tracer !is null) 244 { 245 tracer.sentHeader(HEADER_DESCRIPTION); 246 } 247 } 248 } 249 250 override 251 int pending() 252 { 253 return _pending is null ? 0 : _pending.remaining(); 254 } 255 256 void setPending(ByteBuffer pending) 257 { 258 _pending = pending; 259 } 260 261 override 262 SaslState getState() 263 { 264 return _state; 265 } 266 267 Binary getChallengeResponse() 268 { 269 return _challengeResponse; 270 } 271 272 void setChallengeResponse(Binary challengeResponse) 273 { 274 _challengeResponse = challengeResponse; 275 } 276 277 override 278 void setMechanisms(string[] mechanisms) 279 { 280 if(mechanisms !is null && mechanisms.length != 0) 281 { 282 _mechanisms = new Symbol[mechanisms.length]; 283 for(int i = 0; i < mechanisms.length; i++) 284 { 285 _mechanisms[i] = Symbol.valueOf(mechanisms[i]); 286 } 287 } 288 289 if(_role == Role.CLIENT) 290 { 291 // assert mechanisms !is null; 292 // assert mechanisms.length == 1; 293 294 _chosenMechanism = Symbol.valueOf(mechanisms[0]); 295 } 296 } 297 298 override 299 string[] getRemoteMechanisms() 300 { 301 if(_role == Role.SERVER) 302 { 303 return _chosenMechanism is null ? [] : [ _chosenMechanism.toString() ]; 304 } 305 else if(_role == Role.CLIENT) 306 { 307 if(_mechanisms is null) 308 { 309 return []; 310 } 311 else 312 { 313 string[] remoteMechanisms = new string[_mechanisms.length]; 314 //string[] remoteMechanisms = ["ANONYMOUS"] ;//= new string[_mechanisms.length]; 315 //logInfo("length : %d",_mechanisms.length); 316 //if (_mechanisms[0] is null) 317 //{ 318 // logInfo("sssssssssssssssssssssssssssssssssssssss"); 319 //} 320 //logInfo("ccccc %s :",_mechanisms[0].toString); 321 foreach (Symbol sy; _mechanisms) 322 { 323 remoteMechanisms ~= sy.toString; 324 } 325 for(int i = 0; i < _mechanisms.length; i++) 326 { 327 remoteMechanisms[i] = _mechanisms[i].toString(); 328 } 329 return remoteMechanisms; 330 } 331 } 332 else 333 { 334 throw new IllegalStateException(); 335 } 336 } 337 338 void setMechanism(Symbol mechanism) 339 { 340 _chosenMechanism = mechanism; 341 } 342 343 Symbol getChosenMechanism() 344 { 345 return _chosenMechanism; 346 } 347 348 void setResponse(Binary initialResponse) 349 { 350 setPending(initialResponse.asByteBuffer()); 351 } 352 353 override 354 void handle(SaslFrameBody o, Binary payload) 355 { 356 SaslMechanisms frameBody = cast(SaslMechanisms)o; 357 if (frameBody !is null) 358 { 359 _transport.log(TransportImpl.INCOMING, frameBody); 360 361 ProtocolTracer tracer = _transport.getProtocolTracer(); 362 if( tracer !is null ) 363 { 364 tracer.receivedSaslBody(frameBody); 365 } 366 367 frameBody.invoke(this, payload, null); 368 return; 369 } 370 hunt.proton.amqp.security.SaslOutcome.SaslOutcome frameBodyOutCome = cast(hunt.proton.amqp.security.SaslOutcome.SaslOutcome)o; 371 if (frameBodyOutCome !is null) 372 { 373 _transport.log(TransportImpl.INCOMING, frameBodyOutCome); 374 375 ProtocolTracer tracer = _transport.getProtocolTracer(); 376 if( tracer !is null ) 377 { 378 tracer.receivedSaslBody(frameBodyOutCome); 379 } 380 381 frameBodyOutCome.invoke(this, payload, null); 382 return; 383 } 384 SaslInit frameBodyInit = cast(SaslInit)o; 385 if (frameBodyInit !is null) 386 { 387 _transport.log(TransportImpl.INCOMING, frameBodyInit); 388 389 ProtocolTracer tracer = _transport.getProtocolTracer(); 390 if( tracer !is null ) 391 { 392 tracer.receivedSaslBody(frameBodyInit); 393 } 394 395 frameBodyInit.invoke(this, payload, null); 396 return; 397 } 398 399 SaslResponse frameBodyResp = cast(SaslResponse)o; 400 if (frameBodyResp !is null) 401 { 402 _transport.log(TransportImpl.INCOMING, frameBodyResp); 403 404 ProtocolTracer tracer = _transport.getProtocolTracer(); 405 if( tracer !is null ) 406 { 407 tracer.receivedSaslBody(frameBodyResp); 408 } 409 410 frameBodyResp.invoke(this, payload, null); 411 return; 412 } 413 SaslChallenge frameBodyChall = cast(SaslChallenge)o; 414 if (frameBodyChall !is null) 415 { 416 _transport.log(TransportImpl.INCOMING, frameBodyChall); 417 418 ProtocolTracer tracer = _transport.getProtocolTracer(); 419 if( tracer !is null ) 420 { 421 tracer.receivedSaslBody(frameBodyChall); 422 } 423 424 frameBodyChall.invoke(this, payload, null); 425 return; 426 } 427 428 429 } 430 431 override 432 void handleInit(SaslInit saslInit, Binary payload, Void context) 433 { 434 if(_role == Role.SERVER) 435 { 436 server(); 437 } 438 checkRole(Role.SERVER); 439 _hostname = cast(string)(saslInit.getHostname().getBytes()); 440 _chosenMechanism = saslInit.getMechanism(); 441 _initReceived = true; 442 if(saslInit.getInitialResponse() !is null) 443 { 444 setPending(saslInit.getInitialResponse().asByteBuffer()); 445 } 446 447 if(_saslListener !is null) { 448 _saslListener.onSaslInit(this, _transport); 449 } 450 } 451 452 void invoke(SaslFrameBodyHandler!Void handler, Binary payload, Void context) 453 { 454 implementationMissing(false); 455 456 } 457 458 override 459 void handleResponse(SaslResponse saslResponse, Binary payload, Void context) 460 { 461 checkRole(Role.SERVER); 462 setPending(saslResponse.getResponse() is null ? null : saslResponse.getResponse().asByteBuffer()); 463 464 if(_saslListener !is null) { 465 _saslListener.onSaslResponse(this, _transport); 466 } 467 } 468 469 override 470 void done(hunt.proton.engine.Sasl.SaslOutcome outcome) 471 { 472 checkRole(Role.SERVER); 473 _outcome = outcome; 474 _done = true; 475 logInfo("_done true !!!!!!!"); 476 _state = classifyStateFromOutcome(outcome); 477 logInfo("SASL negotiation done"); 478 479 } 480 481 private void checkRole(Role role) 482 { 483 //if(role != _role) 484 //{ 485 // throw new IllegalStateException("Role is " + to!string(_role) + " but should be " + to!string(role)); 486 //} 487 } 488 489 override 490 void handleMechanisms(SaslMechanisms saslMechanisms, Binary payload, Void context) 491 { 492 if(_role == Role.CLIENT) 493 { 494 client(); 495 } 496 checkRole(Role.CLIENT); 497 if (saslMechanisms.getSaslServerMechanisms() is null) 498 { 499 _mechanisms ~= Symbol.valueOf("ANONYMOUS"); 500 } 501 else { 502 _mechanisms = saslMechanisms.getSaslServerMechanisms().toArray(); 503 } 504 505 506 if(_saslListener !is null) { 507 _saslListener.onSaslMechanisms(this, _transport); 508 } 509 } 510 511 override 512 void handleChallenge(SaslChallenge saslChallenge, Binary payload, Void context) 513 { 514 checkRole(Role.CLIENT); 515 setPending(saslChallenge.getChallenge() is null ? null : saslChallenge.getChallenge().asByteBuffer()); 516 517 if(_saslListener !is null) { 518 _saslListener.onSaslChallenge(this, _transport); 519 } 520 } 521 522 override 523 void handleOutcome(hunt.proton.amqp.security.SaslOutcome.SaslOutcome saslOutcome, 524 Binary payload, 525 Void context) 526 { 527 checkRole(Role.CLIENT); 528 foreach(hunt.proton.engine.Sasl.SaslOutcome outcome ; hunt.proton.engine.Sasl.SaslOutcome.values()) 529 { 530 setPending(saslOutcome.getAdditionalData() is null ? null : saslOutcome.getAdditionalData().asByteBuffer()); 531 if(outcome.getCode() == saslOutcome.getCode().ordinal()) 532 { 533 _outcome = outcome; 534 if (_state != SaslState.PN_SASL_IDLE) 535 { 536 _state = classifyStateFromOutcome(outcome); 537 } 538 break; 539 } 540 } 541 _done = true; 542 543 //if(_logger.isLoggable(Level.FINE)) 544 //{ 545 // _logger.fine("Handled outcome: " + this); 546 //} 547 548 if(_saslListener !is null) { 549 _saslListener.onSaslOutcome(this, _transport); 550 } 551 } 552 553 private SaslState classifyStateFromOutcome(hunt.proton.engine.Sasl.SaslOutcome outcome) 554 { 555 return outcome == hunt.proton.engine.Sasl.SaslOutcome.PN_SASL_OK ? SaslState.PN_SASL_PASS : SaslState.PN_SASL_FAIL; 556 } 557 558 private void processResponse() 559 { 560 SaslResponse response = new SaslResponse(); 561 response.setResponse(getChallengeResponse()); 562 setChallengeResponse(null); 563 writeFrame(response); 564 } 565 566 private void processInit() 567 { 568 SaslInit init = new SaslInit(); 569 570 init.setHostname(_hostname is null? null : new String(_hostname)); 571 init.setMechanism(_chosenMechanism); 572 if(getChallengeResponse() !is null) 573 { 574 init.setInitialResponse(getChallengeResponse()); 575 setChallengeResponse(null); 576 } 577 _initSent = true; 578 writeFrame(init); 579 } 580 581 void plain(String username, String password) 582 { 583 client(); 584 _chosenMechanism = Symbol.valueOf("PLAIN"); 585 byte[] usernameBytes = username.getBytes(); 586 byte[] passwordBytes = password.getBytes(); 587 byte[] data = new byte[usernameBytes.length+passwordBytes.length+2]; 588 //System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); 589 data[1 .. 1+usernameBytes.length] = usernameBytes[0 .. usernameBytes.length ]; 590 // System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, passwordBytes.length); 591 data[2+usernameBytes.length .. 2+usernameBytes.length+passwordBytes.length] = passwordBytes[0 .. passwordBytes.length]; 592 setChallengeResponse(new Binary(data)); 593 } 594 595 override 596 hunt.proton.engine.Sasl.SaslOutcome getOutcome() 597 { 598 return _outcome; 599 } 600 601 override 602 void client() 603 { 604 _role = Role.CLIENT; 605 if(_mechanisms !is null) 606 { 607 //assert _mechanisms.length == 1; 608 609 _chosenMechanism = _mechanisms[0]; 610 } 611 } 612 613 override 614 void server() 615 { 616 _role = Role.SERVER; 617 } 618 619 override 620 void allowSkip(bool allowSkip) 621 { 622 _allowSkip = allowSkip; 623 } 624 625 override 626 TransportWrapper wrap(TransportInput input, TransportOutput output) 627 { 628 return new class SaslSniffer { 629 630 this() 631 { 632 super(new SwitchingSaslTransportWrapper(input, output),new PlainTransportWrapper(output, input)); 633 } 634 635 override 636 protected bool isDeterminationMade() { 637 if (_role == Role.SERVER && _allowSkip) { 638 return super.isDeterminationMade(); 639 } else { 640 _selectedTransportWrapper = _wrapper1; 641 return true; 642 } 643 } 644 }; 645 } 646 647 //override 648 //String toString() 649 //{ 650 // StringBuilder builder = new StringBuilder(); 651 // builder 652 // .append("SaslImpl [_outcome=").append(_outcome) 653 // .append(", state=").append(_state) 654 // .append(", done=").append(_done) 655 // .append(", role=").append(_role) 656 // .append("]"); 657 // return builder.toString(); 658 //} 659 660 class SaslTransportWrapper : TransportWrapper 661 { 662 private TransportInput _underlyingInput; 663 private TransportOutput _underlyingOutput; 664 private bool _outputComplete; 665 666 private ByteBuffer _outputBuffer; 667 private ByteBuffer _inputBuffer; 668 private ByteBuffer _head; 669 670 private SwitchingSaslTransportWrapper _parent; 671 672 this(SwitchingSaslTransportWrapper parent, TransportInput input, TransportOutput output) 673 { 674 _underlyingInput = input; 675 _underlyingOutput = output; 676 677 _inputBuffer = ByteBufferUtils.newWriteableBuffer(_maxFrameSize); 678 _outputBuffer = ByteBufferUtils.newWriteableBuffer(_maxFrameSize); 679 680 _parent = parent; 681 682 if (_transport.isUseReadOnlyOutputBuffer()) { 683 _head = _outputBuffer.asReadOnlyBuffer(); 684 } else { 685 _head = _outputBuffer.duplicate(); 686 } 687 688 _head.limit(0); 689 } 690 691 private void fillOutputBuffer() 692 { 693 if(isOutputInSaslMode()) 694 { 695 writeSaslOutput(); 696 if(_done) 697 { 698 _outputComplete = true; 699 } 700 } 701 } 702 703 /** 704 * TODO rationalise this method with respect to the other similar checks of _role/_initReceived etc 705 * @see SaslImpl#isDone() 706 */ 707 private bool isInputInSaslMode() 708 { 709 return (_role == Role.CLIENT && !_done) || (_role == Role.SERVER && (!_initReceived || !_done)); 710 } 711 712 private bool isOutputInSaslMode() 713 { 714 return (_role == Role.CLIENT && (!_done || !_initSent)) || (_role == Role.SERVER && !_outputComplete); 715 } 716 717 override 718 int capacity() 719 { 720 if (_tail_closed) return Transport.END_OF_STREAM; 721 if (isInputInSaslMode()) 722 { 723 return _inputBuffer.remaining(); 724 } 725 else 726 { 727 return _underlyingInput.capacity(); 728 } 729 } 730 731 override 732 int position() 733 { 734 if (_tail_closed) return Transport.END_OF_STREAM; 735 if (isInputInSaslMode()) 736 { 737 return _inputBuffer.position(); 738 } 739 else 740 { 741 return _underlyingInput.position(); 742 } 743 } 744 745 override 746 ByteBuffer tail() 747 { 748 if (!isInputInSaslMode()) 749 { 750 return _underlyingInput.tail(); 751 } 752 753 return _inputBuffer; 754 } 755 756 override 757 void process() 758 { 759 _inputBuffer.flip(); 760 761 try 762 { 763 reallyProcessInput(); 764 } 765 finally 766 { 767 _inputBuffer.compact(); 768 } 769 } 770 771 override 772 void close_tail() 773 { 774 _tail_closed = true; 775 if (isInputInSaslMode()) { 776 _head_closed = true; 777 _underlyingInput.close_tail(); 778 } else { 779 _underlyingInput.close_tail(); 780 } 781 } 782 783 private void reallyProcessInput() 784 { 785 if(isInputInSaslMode()) 786 { 787 //if(_logger.isLoggable(Level.FINER)) 788 //{ 789 // _logger.log(Level.FINER, SaslImpl.this + " about to call input."); 790 //} 791 792 _frameParser.input(_inputBuffer); 793 } 794 795 if(!isInputInSaslMode()) 796 { 797 //if(_logger.isLoggable(Level.FINER)) 798 //{ 799 // _logger.log(Level.FINER, SaslImpl.this + " about to call plain input"); 800 //} 801 802 if (_inputBuffer.hasRemaining()) 803 { 804 int bytes = ByteBufferUtils.pourAll(_inputBuffer, _underlyingInput); 805 if (bytes == Transport.END_OF_STREAM) 806 { 807 _tail_closed = true; 808 } 809 810 if (!_inputBuffer.hasRemaining()) 811 { 812 _parent.switchToNextInput(); 813 } 814 } 815 else 816 { 817 _parent.switchToNextInput(); 818 } 819 820 _underlyingInput.process(); 821 } 822 } 823 824 override 825 int pending() 826 { 827 version(HUNT_AMQP_DEBUG) 828 { 829 // logInfof("isOutputInSaslMode : %d -----------pos: %d",isOutputInSaslMode(),_outputBuffer.position()); 830 } 831 if (isOutputInSaslMode() || _outputBuffer.position() != 0) 832 { 833 fillOutputBuffer(); 834 _head.limit(_outputBuffer.position()); 835 836 if (_head_closed && _outputBuffer.position() == 0) 837 { 838 return Transport.END_OF_STREAM; 839 } 840 else 841 { 842 return _outputBuffer.position(); 843 } 844 } 845 else 846 { 847 _parent.switchToNextOutput(); 848 return _underlyingOutput.pending(); 849 } 850 } 851 852 override 853 ByteBuffer head() 854 { 855 if (isOutputInSaslMode() || _outputBuffer.position() != 0) 856 { 857 pending(); 858 return _head; 859 } 860 else 861 { 862 _parent.switchToNextOutput(); 863 return _underlyingOutput.head(); 864 } 865 } 866 867 override 868 void pop(int bytes) 869 { 870 if (isOutputInSaslMode() || _outputBuffer.position() != 0) 871 { 872 _outputBuffer.flip(); 873 _outputBuffer.position(bytes); 874 _outputBuffer.compact(); 875 _head.position(0); 876 _head.limit(_outputBuffer.position()); 877 } 878 else 879 { 880 _parent.switchToNextOutput(); 881 _underlyingOutput.pop(bytes); 882 } 883 } 884 885 override 886 void close_head() 887 { 888 _parent.switchToNextOutput(); 889 _underlyingOutput.close_head(); 890 } 891 892 private void writeSaslOutput() 893 { 894 this.outer.process(); 895 _frameWriter.readBytes(_outputBuffer); 896 //logInfo("Finished writing SASL output. Output Buffer : %s", _outputBuffer.getRemaining()); 897 898 //if(_logger.isLoggable(Level.FINER)) 899 //{ 900 // _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer); 901 //} 902 } 903 } 904 905 class SwitchingSaslTransportWrapper : TransportWrapper { 906 907 private TransportInput _underlyingInput; 908 private TransportOutput _underlyingOutput; 909 910 private TransportInput currentInput; 911 private TransportOutput currentOutput; 912 913 this(TransportInput input, TransportOutput output) { 914 _underlyingInput = input; 915 _underlyingOutput = output; 916 917 // The wrapper can be GC'd after both current's are switched to next. 918 SaslTransportWrapper saslProcessor = new SaslTransportWrapper(this, input, output); 919 920 currentInput = saslProcessor; 921 currentOutput = saslProcessor; 922 } 923 924 int capacity() { 925 return currentInput.capacity(); 926 } 927 928 int position() { 929 return currentInput.position(); 930 } 931 932 ByteBuffer tail() { 933 return currentInput.tail(); 934 } 935 936 void process() { 937 currentInput.process(); 938 } 939 940 void close_tail() { 941 currentInput.close_tail(); 942 } 943 944 int pending() { 945 return currentOutput.pending(); 946 } 947 948 ByteBuffer head() { 949 return currentOutput.head(); 950 } 951 952 void pop(int bytes) { 953 currentOutput.pop(bytes); 954 } 955 956 void close_head() { 957 currentOutput.close_head(); 958 } 959 960 void switchToNextInput() { 961 currentInput = _underlyingInput; 962 } 963 964 void switchToNextOutput() { 965 currentOutput = _underlyingOutput; 966 } 967 } 968 969 string getHostname() 970 { 971 //if(_role = Role.CLIENT) 972 //{ 973 // checkRole(Role.SERVER); 974 //} 975 976 return _hostname; 977 } 978 979 void setRemoteHostname(string hostname) 980 { 981 //if(_role !is null) 982 //{ 983 // checkRole(Role.CLIENT); 984 //} 985 986 _hostname = hostname; 987 } 988 989 override 990 void setListener(SaslListener saslListener) { 991 _saslListener = saslListener; 992 } 993 }