1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.proton.engine.impl.TransportOutputAdaptor;
12 
13 import hunt.proton.engine.impl.ByteBufferUtils;
14 
15 import hunt.io.ByteBuffer;
16 import hunt.proton.engine.impl.TransportImpl;
17 import hunt.proton.engine.Transport;
18 import hunt.proton.engine.impl.TransportOutputWriter;
19 import hunt.proton.engine.impl.TransportOutput;
20 import std.concurrency : initOnce;
21 import hunt.logging;
22 
23 class TransportOutputAdaptor : TransportOutput
24 {
25    // private static ByteBuffer _emptyHead = newReadableBuffer(0).asReadOnlyBuffer();
26 
27     static ByteBuffer  _emptyHead() {
28         __gshared ByteBuffer  inst;
29         return initOnce!inst(ByteBufferUtils.newReadableBuffer(0).asReadOnlyBuffer());
30     }
31 
32     private TransportOutputWriter _transportOutputWriter;
33     private int _maxFrameSize;
34 
35     private ByteBuffer _outputBuffer = null;
36     private ByteBuffer _head = null;
37     private bool _output_done = false;
38     private bool _head_closed = false;
39     private bool _readOnlyHead = true;
40 
41     this(TransportOutputWriter transportOutputWriter, int maxFrameSize, bool readOnlyHead)
42     {
43         _transportOutputWriter = transportOutputWriter;
44         _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 16*1024;
45         _readOnlyHead = readOnlyHead;
46     }
47 
48     
49     public int pending()
50     {
51         if (_head_closed) {
52             return Transport.END_OF_STREAM;
53         }
54 
55         if(_outputBuffer is null)
56         {
57             init_buffers();
58         }
59        // logInfof("pending ------- %s",_outputBuffer.getRemaining());
60         _output_done = _transportOutputWriter.writeInto(_outputBuffer);
61         _head.limit(_outputBuffer.position());
62 
63         if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD)
64         {
65             release_buffers();
66         }
67 
68         if (_output_done && (_outputBuffer is null || _outputBuffer.position() == 0))
69         {
70             return Transport.END_OF_STREAM;
71         }
72         else
73         {
74             return _outputBuffer is null ? 0 : _outputBuffer.position();
75         }
76     }
77 
78     
79     public ByteBuffer head()
80     {
81         pending();
82         return _head !is null ? _head : _emptyHead;
83     }
84 
85     
86     public void pop(int bytes)
87     {
88         if (_outputBuffer !is null) {
89             _outputBuffer.flip();
90             _outputBuffer.position(bytes);
91             _outputBuffer.compact();
92             _head.position(0);
93             _head.limit(_outputBuffer.position());
94             if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
95                 release_buffers();
96             }
97         }
98     }
99 
100     
101     public void close_head()
102     {
103         _head_closed = true;
104         _transportOutputWriter.closed(null);
105         release_buffers();
106     }
107 
108     private void init_buffers() {
109         _outputBuffer = ByteBufferUtils.newWriteableBuffer(_maxFrameSize);
110         if (_readOnlyHead) {
111             _head = _outputBuffer.asReadOnlyBuffer();
112         } else {
113             _head = _outputBuffer.duplicate();
114         }
115         _head.limit(0);
116     }
117 
118     private void release_buffers() {
119         _head = null;
120         _outputBuffer = null;
121     }
122 }