1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.Transport; 13 14 import hunt.io.ByteBuffer; 15 16 import hunt.proton.amqp.transport.ErrorCondition; 17 import hunt.proton.engine.impl.TransportImpl; 18 import hunt.proton.engine.Endpoint; 19 import hunt.proton.engine.Connection; 20 import hunt.proton.engine.TransportResult; 21 import hunt.proton.engine.Sasl; 22 import hunt.proton.engine.SslDomain; 23 import hunt.proton.engine.Ssl; 24 import hunt.proton.engine.SslPeerDetails; 25 26 /** 27 * <p> 28 * Operates on the entities in the associated {@link Connection} 29 * by accepting and producing binary AMQP output, potentially 30 * layered within SASL and/or SSL. 31 * </p> 32 * <p> 33 * After a connection is bound with {@link #bind(Connection)}, the methods for accepting and producing 34 * output are typically repeatedly called. See the specific methods for details of their legal usage. 35 * </p> 36 * <p> 37 * <strong>Processing the input data received from another AMQP container.</strong> 38 * </p> 39 * <ol> 40 * <li>{@link #getInputBuffer()} </li> 41 * <li>Write data into input buffer</li> 42 * <li>{@link #processInput()}</li> 43 * <li>Check the result, e.g. by calling {@link TransportResult#checkIsOk()}</li> 44 * </ol> 45 * <p> 46 * <strong>Getting the output data to send to another AMQP container:</strong> 47 * </p> 48 * <ol> 49 * <li>{@link #getOutputBuffer()} </li> 50 * <li>Read output from output buffer</li> 51 * <li>{@link #outputConsumed()}</li> 52 * </ol> 53 * 54 * <p>The following methods on the byte buffers returned by {@link #getInputBuffer()} and {@link #getOutputBuffer()} 55 * must not be called: 56 * </p> 57 * <ol> 58 * <li> {@link ByteBuffer#clear()} </li> 59 * <li> {@link ByteBuffer#compact()} </li> 60 * <li> {@link ByteBuffer#flip()} </li> 61 * <li> {@link ByteBuffer#mark()} </li> 62 * </ol> 63 */ 64 interface Transport : Endpoint 65 { 66 67 class Factory 68 { 69 public static Transport create() { 70 return new TransportImpl(); 71 } 72 } 73 74 public static int TRACE_OFF = 0; 75 public static int TRACE_RAW = 1; 76 public static int TRACE_FRM = 2; 77 public static int TRACE_DRV = 4; 78 79 public static int DEFAULT_MAX_FRAME_SIZE = -1; 80 81 /** the lower bound for the agreed maximum frame size (in bytes). */ 82 enum int MIN_MAX_FRAME_SIZE = 512; 83 enum int SESSION_WINDOW = 16*1024; 84 enum int END_OF_STREAM = -1; 85 86 public void trace(int levels); 87 88 public void bind(Connection connection); 89 public void unbind(); 90 91 public int capacity(); 92 public ByteBuffer tail(); 93 public void process() ; 94 public void close_tail(); 95 96 97 public int pending(); 98 public ByteBuffer head(); 99 public void pop(int bytes); 100 public void close_head(); 101 102 public bool isClosed(); 103 104 /** 105 * Processes the provided input. 106 * 107 * @param bytes input bytes for consumption 108 * @param offset the offset within bytes where input begins 109 * @param size the number of bytes available for input 110 * 111 * @return the number of bytes consumed 112 * @throws TransportException if the input is invalid, if the transport is already in an error state, 113 * or if the input is empty (unless the remote connection is already closed) 114 * @deprecated use {@link #getInputBuffer()} and {@link #processInput()} instead. 115 */ 116 public int input(byte[] bytes, int offset, int size); 117 118 /** 119 * Get a buffer that can be used to write input data into the transport. 120 * Once the client has finished putting into the input buffer, {@link #processInput()} 121 * must be called. 122 * 123 * Successive calls to this method are not guaranteed to return the same object. 124 * Once {@link #processInput()} is called the buffer must not be used. 125 * 126 * @throws TransportException if the transport is already in an invalid state 127 */ 128 ByteBuffer getInputBuffer(); 129 130 /** 131 * Tell the transport to process the data written to the input buffer. 132 * 133 * If the returned result indicates failure, the transport will not accept any more input. 134 * Specifically, any subsequent {@link #processInput()} calls on this object will 135 * throw an exception. 136 * 137 * @return the result of processing the data, which indicates success or failure. 138 * @see #getInputBuffer() 139 */ 140 TransportResult processInput(); 141 142 /** 143 * Has the transport produce up to size bytes placing the result 144 * into dest beginning at position offset. 145 * 146 * @param dest array for output bytes 147 * @param offset the offset within bytes where output begins 148 * @param size the maximum number of bytes to be output 149 * 150 * @return the number of bytes written 151 * @deprecated use {@link #getOutputBuffer()} and {@link #outputConsumed()} instead 152 */ 153 public int output(byte[] dest, int offset, int size); 154 155 /** 156 * Get a read-only byte buffer containing the transport's pending output. 157 * Once the client has finished getting from the output buffer, {@link #outputConsumed()} 158 * must be called. 159 * 160 * Successive calls to this method are not guaranteed to return the same object. 161 * Once {@link #outputConsumed()} is called the buffer must not be used. 162 * 163 * If the transport's state changes AFTER calling this method, this will not be 164 * reflected in the output buffer. 165 */ 166 ByteBuffer getOutputBuffer(); 167 168 /** 169 * Informs the transport that the output buffer returned by {@link #getOutputBuffer()} 170 * is finished with, allowing implementation-dependent steps to be performed such as 171 * reclaiming buffer space. 172 */ 173 void outputConsumed(); 174 175 /** 176 * Signal the transport to expect SASL frames used to establish a SASL layer prior to 177 * performing the AMQP protocol version negotiation. This must first be performed before 178 * the transport is used for processing. Subsequent invocations will return the same 179 * {@link Sasl} object. 180 * 181 * @throws IllegalStateException if transport processing has already begun prior to initial invocation 182 */ 183 Sasl sasl() ; 184 185 /** 186 * Wrap this transport's output and input to apply SSL encryption and decryption respectively. 187 * 188 * This method is expected to be called at most once. A subsequent invocation will return the same 189 * {@link Ssl} object, regardless of the parameters supplied. 190 * 191 * @param sslDomain the SSL settings to use 192 * @param sslPeerDetails peer details, used for SNI, hostname verification, etc when connecting. May be null. 193 * @return an {@link Ssl} object representing the SSL session. 194 * @throws IllegalArgumentException if the sslDomain requests hostname verification but sslPeerDetails are null. 195 * @throws IllegalStateException if the sslDomain has not been initialised. 196 */ 197 Ssl ssl(SslDomain sslDomain, SslPeerDetails sslPeerDetails); 198 199 /** 200 * Equivalent to {@link #ssl(SslDomain, SslPeerDetails)} but passing null for SslPeerDetails, meaning no SNI detail 201 * is sent, hostname verification isn't supported etc when connecting. 202 * 203 * @throws IllegalArgumentException if the sslDomain requests hostname verification. 204 * @throws IllegalStateException if the sslDomain has not been initialised. 205 */ 206 Ssl ssl(SslDomain sslDomain) ; 207 208 209 /** 210 * Get the maximum frame size for the transport 211 * 212 * @return the maximum frame size 213 */ 214 int getMaxFrameSize(); 215 216 void setMaxFrameSize(int size); 217 218 int getRemoteMaxFrameSize(); 219 220 /** 221 * Allows overriding the initial remote-max-frame-size to a value greater than the default 512bytes. The value set 222 * will be used until such time as the Open frame arrives from the peer and populates the remote max frame size. 223 * 224 * This method must be called before before {@link #sasl()} in order to influence SASL behaviour. 225 * @param size the remote frame size to use 226 */ 227 void setInitialRemoteMaxFrameSize(int size); 228 229 /** 230 * Gets the local channel-max value to be advertised to the remote peer 231 * 232 * @return the local channel-max value 233 * @see #setChannelMax(int) 234 */ 235 int getChannelMax(); 236 237 /** 238 * Set the local value of channel-max, to be advertised to the peer on the 239 * <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open"> 240 * Open frame</a> emitted by the transport. 241 * 242 * The remote peers advertised channel-max can be observed using {@link #getRemoteChannelMax()}. 243 * 244 * @param channelMax the local channel-max to advertise to the peer, in range [0 - 2^16). 245 * @throws IllegalArgumentException if the value supplied is outside range [0 - 2^16). 246 */ 247 void setChannelMax(int channelMax); 248 249 /** 250 * Gets the remote value of channel-max, as advertised by the peer on its 251 * <a href="http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open"> 252 * Open frame</a>. 253 * 254 * The local peers advertised channel-max can be observed using {@link #getChannelMax()}. 255 * 256 * @return the remote channel-max value 257 */ 258 int getRemoteChannelMax(); 259 260 ErrorCondition getCondition(); 261 262 /** 263 * 264 * @param timeout local idle timeout in milliseconds 265 */ 266 void setIdleTimeout(int timeout); 267 /** 268 * 269 * @return local idle timeout in milliseconds 270 */ 271 int getIdleTimeout(); 272 /** 273 * 274 * @return remote idle timeout in milliseconds 275 */ 276 int getRemoteIdleTimeout(); 277 278 /** 279 * Prompt the transport to perform work such as idle-timeout/heartbeat handling, and return an 280 * absolute deadline in milliseconds that tick must again be called by/at, based on the provided 281 * current time in milliseconds, to ensure the periodic work is carried out as necessary. 282 * 283 * A returned deadline of 0 indicates there is no periodic work necessitating tick be called, e.g. 284 * because neither peer has defined an idle-timeout value. 285 * 286 * The provided milliseconds time values can be from {@link System#currentTimeMillis()} or derived 287 * from {@link System#nanoTime()}, noting that for the later in particular that the returned 288 * deadline could be a different sign than the given time, and (if non-zero) the returned 289 * deadline should have the current time originally provided subtracted from it in order to 290 * establish a relative time delay to the next deadline. 291 * 292 * @param nowMillis the current time in milliseconds 293 * @return the absolute deadline in milliseconds to next call tick by/at, or 0 if there is none. 294 */ 295 long tick(long nowMillis); 296 297 long getFramesInput(); 298 299 long getFramesOutput(); 300 301 /** 302 * Configure whether a synthetic Flow event should be emitted when messages are sent, 303 * reflecting a change in the credit level on the link that may prompt other action. 304 * 305 * Defaults to true. 306 * 307 * @param emitFlowEventOnSend true if a flow event should be emitted, false otherwise 308 */ 309 void setEmitFlowEventOnSend(bool emitFlowEventOnSend); 310 311 bool isEmitFlowEventOnSend(); 312 313 /** 314 * Set an upper limit on the size of outgoing frames that will be sent 315 * to the peer. Allows constraining the transport not to emit Transfer 316 * frames over a given size even when the peers max frame size allows it. 317 * 318 * Must be set before receiving the peers Open frame to have effect. 319 * 320 * @param size the size limit to apply 321 */ 322 void setOutboundFrameSizeLimit(int size); 323 324 int getOutboundFrameSizeLimit(); 325 }