1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.proton.engine.impl.TransportOutputAdaptor; 12 13 import hunt.proton.engine.impl.ByteBufferUtils; 14 15 import hunt.io.ByteBuffer; 16 import hunt.proton.engine.impl.TransportImpl; 17 import hunt.proton.engine.Transport; 18 import hunt.proton.engine.impl.TransportOutputWriter; 19 import hunt.proton.engine.impl.TransportOutput; 20 import std.concurrency : initOnce; 21 import hunt.logging; 22 23 class TransportOutputAdaptor : TransportOutput 24 { 25 // private static ByteBuffer _emptyHead = newReadableBuffer(0).asReadOnlyBuffer(); 26 27 static ByteBuffer _emptyHead() { 28 __gshared ByteBuffer inst; 29 return initOnce!inst(ByteBufferUtils.newReadableBuffer(0).asReadOnlyBuffer()); 30 } 31 32 private TransportOutputWriter _transportOutputWriter; 33 private int _maxFrameSize; 34 35 private ByteBuffer _outputBuffer = null; 36 private ByteBuffer _head = null; 37 private bool _output_done = false; 38 private bool _head_closed = false; 39 private bool _readOnlyHead = true; 40 41 this(TransportOutputWriter transportOutputWriter, int maxFrameSize, bool readOnlyHead) 42 { 43 _transportOutputWriter = transportOutputWriter; 44 _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 16*1024; 45 _readOnlyHead = readOnlyHead; 46 } 47 48 49 public int pending() 50 { 51 if (_head_closed) { 52 return Transport.END_OF_STREAM; 53 } 54 55 if(_outputBuffer is null) 56 { 57 init_buffers(); 58 } 59 // logInfof("pending ------- %s",_outputBuffer.getRemaining()); 60 _output_done = _transportOutputWriter.writeInto(_outputBuffer); 61 _head.limit(_outputBuffer.position()); 62 63 if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) 64 { 65 release_buffers(); 66 } 67 68 if (_output_done && (_outputBuffer is null || _outputBuffer.position() == 0)) 69 { 70 return Transport.END_OF_STREAM; 71 } 72 else 73 { 74 return _outputBuffer is null ? 0 : _outputBuffer.position(); 75 } 76 } 77 78 79 public ByteBuffer head() 80 { 81 pending(); 82 return _head !is null ? _head : _emptyHead; 83 } 84 85 86 public void pop(int bytes) 87 { 88 if (_outputBuffer !is null) { 89 _outputBuffer.flip(); 90 _outputBuffer.position(bytes); 91 _outputBuffer.compact(); 92 _head.position(0); 93 _head.limit(_outputBuffer.position()); 94 if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) { 95 release_buffers(); 96 } 97 } 98 } 99 100 101 public void close_head() 102 { 103 _head_closed = true; 104 _transportOutputWriter.closed(null); 105 release_buffers(); 106 } 107 108 private void init_buffers() { 109 _outputBuffer = ByteBufferUtils.newWriteableBuffer(_maxFrameSize); 110 if (_readOnlyHead) { 111 _head = _outputBuffer.asReadOnlyBuffer(); 112 } else { 113 _head = _outputBuffer.duplicate(); 114 } 115 _head.limit(0); 116 } 117 118 private void release_buffers() { 119 _head = null; 120 _outputBuffer = null; 121 } 122 }