1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.HandshakeSniffingTransportWrapper; 13 14 import hunt.io.ByteBuffer; 15 import hunt.io.BufferUtils; 16 import hunt.proton.engine.Transport; 17 import hunt.proton.engine.TransportException; 18 import hunt.proton.engine.impl.TransportWrapper; 19 import hunt.Exceptions; 20 import std.concurrency : initOnce; 21 22 23 class HandshakeSniffingTransportWrapper(T1 , T2): TransportWrapper 24 { 25 26 protected T1 _wrapper1; 27 protected T2 _wrapper2; 28 29 private bool _tail_closed = false; 30 private bool _head_closed = false; 31 protected TransportWrapper _selectedTransportWrapper; 32 33 private ByteBuffer _determinationBuffer; 34 35 this(T1 wrapper1,T2 wrapper2) 36 { 37 _wrapper1 = wrapper1; 38 _wrapper2 = wrapper2; 39 _determinationBuffer = BufferUtils.allocate(bufferSize()); 40 } 41 42 public int capacity() 43 { 44 if (isDeterminationMade()) 45 { 46 return _selectedTransportWrapper.capacity(); 47 } 48 else 49 { 50 if (_tail_closed) { return Transport.END_OF_STREAM; } 51 return _determinationBuffer.remaining(); 52 } 53 } 54 55 public int position() 56 { 57 if (isDeterminationMade()) 58 { 59 return _selectedTransportWrapper.position(); 60 } 61 else 62 { 63 if (_tail_closed) { return Transport.END_OF_STREAM; } 64 return _determinationBuffer.position(); 65 } 66 } 67 68 public ByteBuffer tail() 69 { 70 if (isDeterminationMade()) 71 { 72 return _selectedTransportWrapper.tail(); 73 } 74 else 75 { 76 return _determinationBuffer; 77 } 78 } 79 80 protected abstract int bufferSize(); 81 82 protected abstract void makeDetermination(byte[] bytes); 83 84 public void process() 85 { 86 if (isDeterminationMade()) 87 { 88 _selectedTransportWrapper.process(); 89 } 90 else if (_determinationBuffer.remaining() == 0) 91 { 92 _determinationBuffer.flip(); 93 byte[] bytesInput = new byte[_determinationBuffer.remaining()]; 94 _determinationBuffer.get(bytesInput); 95 makeDetermination(bytesInput); 96 _determinationBuffer.rewind(); 97 98 // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round. 99 _selectedTransportWrapper.tail().put(_determinationBuffer); 100 _selectedTransportWrapper.process(); 101 } else if (_tail_closed) { 102 throw new TransportException("connection aborted"); 103 } 104 } 105 106 107 public void close_tail() 108 { 109 try { 110 if (isDeterminationMade()) 111 { 112 _selectedTransportWrapper.close_tail(); 113 } 114 } finally { 115 _tail_closed = true; 116 } 117 } 118 119 120 public int pending() 121 { 122 if (_head_closed) { return Transport.END_OF_STREAM; } 123 124 if (isDeterminationMade()) { 125 return _selectedTransportWrapper.pending(); 126 } else { 127 return 0; 128 } 129 130 } 131 132 //private static ByteBuffer EMPTY = BufferUtils.allocate(0); 133 134 static ByteBuffer EMPTY() { 135 __gshared ByteBuffer inst; 136 return initOnce!inst(BufferUtils.allocate(0)); 137 } 138 139 140 public ByteBuffer head() 141 { 142 if (isDeterminationMade()) { 143 return _selectedTransportWrapper.head(); 144 } else { 145 return EMPTY; 146 } 147 } 148 149 150 public void pop(int bytes) 151 { 152 if (isDeterminationMade()) { 153 _selectedTransportWrapper.pop(bytes); 154 } else if (bytes > 0) { 155 throw new IllegalStateException("no bytes have been read"); 156 } 157 } 158 159 160 public void close_head() 161 { 162 if (isDeterminationMade()) { 163 _selectedTransportWrapper.close_head(); 164 } else { 165 _head_closed = true; 166 } 167 } 168 169 protected bool isDeterminationMade() 170 { 171 return _selectedTransportWrapper !is null; 172 } 173 174 }