1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.HandshakeSniffingTransportWrapper;
13 
14 import hunt.io.ByteBuffer;
15 import hunt.io.BufferUtils;
16 import hunt.proton.engine.Transport;
17 import hunt.proton.engine.TransportException;
18 import hunt.proton.engine.impl.TransportWrapper;
19 import hunt.Exceptions;
20 import std.concurrency : initOnce;
21 
22 
23 class HandshakeSniffingTransportWrapper(T1 , T2): TransportWrapper
24 {
25 
26     protected T1 _wrapper1;
27     protected T2 _wrapper2;
28 
29     private bool _tail_closed = false;
30     private bool _head_closed = false;
31     protected TransportWrapper _selectedTransportWrapper;
32 
33     private ByteBuffer _determinationBuffer;
34 
35     this(T1 wrapper1,T2 wrapper2)
36     {
37         _wrapper1 = wrapper1;
38         _wrapper2 = wrapper2;
39         _determinationBuffer = BufferUtils.allocate(bufferSize());
40     }
41 
42     public int capacity()
43     {
44         if (isDeterminationMade())
45         {
46             return _selectedTransportWrapper.capacity();
47         }
48         else
49         {
50             if (_tail_closed) { return Transport.END_OF_STREAM; }
51             return _determinationBuffer.remaining();
52         }
53     }
54 
55     public int position()
56     {
57         if (isDeterminationMade())
58         {
59             return _selectedTransportWrapper.position();
60         }
61         else
62         {
63             if (_tail_closed) { return Transport.END_OF_STREAM; }
64             return _determinationBuffer.position();
65         }
66     }
67 
68     public ByteBuffer tail()
69     {
70         if (isDeterminationMade())
71         {
72             return _selectedTransportWrapper.tail();
73         }
74         else
75         {
76             return _determinationBuffer;
77         }
78     }
79 
80     protected abstract int bufferSize();
81 
82     protected abstract void makeDetermination(byte[] bytes);
83 
84     public void process() 
85     {
86         if (isDeterminationMade())
87         {
88             _selectedTransportWrapper.process();
89         }
90         else if (_determinationBuffer.remaining() == 0)
91         {
92             _determinationBuffer.flip();
93             byte[] bytesInput = new byte[_determinationBuffer.remaining()];
94             _determinationBuffer.get(bytesInput);
95             makeDetermination(bytesInput);
96             _determinationBuffer.rewind();
97 
98             // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round.
99             _selectedTransportWrapper.tail().put(_determinationBuffer);
100             _selectedTransportWrapper.process();
101         } else if (_tail_closed) {
102             throw new TransportException("connection aborted");
103         }
104     }
105 
106     
107     public void close_tail()
108     {
109         try {
110             if (isDeterminationMade())
111             {
112                 _selectedTransportWrapper.close_tail();
113             }
114         } finally {
115             _tail_closed = true;
116         }
117     }
118 
119     
120     public int pending()
121     {
122         if (_head_closed) { return Transport.END_OF_STREAM; }
123 
124         if (isDeterminationMade()) {
125             return _selectedTransportWrapper.pending();
126         } else {
127             return 0;
128         }
129 
130     }
131 
132     //private static ByteBuffer EMPTY = BufferUtils.allocate(0);
133 
134     static ByteBuffer  EMPTY() {
135         __gshared ByteBuffer  inst;
136         return initOnce!inst(BufferUtils.allocate(0));
137     }
138 
139 
140     public ByteBuffer head()
141     {
142         if (isDeterminationMade()) {
143             return _selectedTransportWrapper.head();
144         } else {
145             return EMPTY;
146         }
147     }
148 
149     
150     public void pop(int bytes)
151     {
152         if (isDeterminationMade()) {
153             _selectedTransportWrapper.pop(bytes);
154         } else if (bytes > 0) {
155             throw new IllegalStateException("no bytes have been read");
156         }
157     }
158 
159     
160     public void close_head()
161     {
162         if (isDeterminationMade()) {
163             _selectedTransportWrapper.close_head();
164         } else {
165             _head_closed = true;
166         }
167     }
168 
169     protected bool isDeterminationMade()
170     {
171         return _selectedTransportWrapper !is null;
172     }
173 
174 }