1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.FrameWriter; 13 14 import hunt.io.ByteBuffer; 15 16 import hunt.proton.amqp.Binary; 17 import hunt.proton.amqp.security.SaslFrameBody; 18 import hunt.proton.amqp.transport.EmptyFrame; 19 import hunt.proton.amqp.transport.FrameBody; 20 import hunt.proton.codec.EncoderImpl; 21 import hunt.proton.codec.ReadableBuffer; 22 import hunt.proton.framing.TransportFrame; 23 import hunt.proton.engine.impl.TransportImpl; 24 import hunt.proton.engine.impl.FrameWriterBuffer; 25 import hunt.util.Common; 26 import std.algorithm; 27 import hunt.proton.engine.impl.ProtocolTracer; 28 import hunt.proton.amqp.transport.Begin; 29 import hunt.proton.amqp.transport.Open; 30 import hunt.proton.amqp.transport.Attach; 31 import hunt.proton.amqp.transport.Transfer; 32 import hunt.logging; 33 import hunt.util.Runnable; 34 35 /** 36 * Writes Frames to an internal buffer for later processing by the transport. 37 */ 38 class FrameWriter { 39 40 static int DEFAULT_FRAME_BUFFER_FULL_MARK = 64 * 1024; 41 static int FRAME_HEADER_SIZE = 8; 42 43 static byte AMQP_FRAME_TYPE = 0; 44 static byte SASL_FRAME_TYPE = 1; 45 46 private TransportImpl transport; 47 private EncoderImpl encoder; 48 private FrameWriterBuffer frameBuffer ;//= new FrameWriterBuffer(); 49 50 // Configuration of this Frame Writer 51 private int maxFrameSize; 52 private byte frameType; 53 private int frameBufferMaxBytes; //= DEFAULT_FRAME_BUFFER_FULL_MARK; 54 55 // State of current write operation, reset on start of each new write 56 private int frameStart; 57 58 // Frame Writer metrics 59 private long framesOutput; 60 61 this(EncoderImpl encoder, int maxFrameSize, byte frameType, TransportImpl transport) { 62 this.encoder = encoder; 63 this.maxFrameSize = maxFrameSize; 64 this.frameType = frameType; 65 this.transport = transport; 66 this.frameBufferMaxBytes = DEFAULT_FRAME_BUFFER_FULL_MARK; 67 this.frameBuffer = new FrameWriterBuffer(); 68 encoder.setByteBuffer(frameBuffer); 69 } 70 71 bool isFull() { 72 return frameBuffer.position() > frameBufferMaxBytes; 73 } 74 75 int readBytes(ByteBuffer dst) { 76 return frameBuffer.transferTo(dst); 77 } 78 79 long getFramesOutput() { 80 return framesOutput; 81 } 82 83 void setMaxFrameSize(int maxFrameSize) { 84 this.maxFrameSize = maxFrameSize; 85 } 86 87 void setFrameWriterMaxBytes(int maxBytes) { 88 this.frameBufferMaxBytes = maxBytes; 89 } 90 91 int getFrameWriterMaxBytes() { 92 return frameBufferMaxBytes; 93 } 94 95 void writeHeader(byte[] header) { 96 frameBuffer.put(header, 0, cast(int)header.length); 97 } 98 99 void writeFrame(Object frameBody) { 100 writeFrame(0, frameBody, null, null); 101 } 102 103 void writeFrame(int channel, Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) { 104 frameStart = frameBuffer.position(); 105 106 version(HUNT_AMQP_DEBUG) { 107 if (cast(Begin)frameBody !is null) 108 { 109 logInfo("."); 110 } 111 } 112 113 int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge); 114 if (cast(Begin)frameBody !is null) 115 { 116 version(HUNT_AMQP_DEBUG) logInfof("begin size: %d", performativeSize); 117 } 118 if (cast(Open)frameBody !is null) 119 { 120 version(HUNT_AMQP_DEBUG) logInfof("Open size: %d", performativeSize); 121 } 122 123 124 if (cast(Open)frameBody !is null) 125 { 126 version(HUNT_AMQP_DEBUG) tracef("Open size: %d", performativeSize); 127 } 128 if (cast(Attach)frameBody !is null) 129 { 130 Attach at = cast(Attach)frameBody; 131 version(HUNT_AMQP_DEBUG) tracef ("%s", at.toString); 132 } 133 134 int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : 2147483647; 135 int payloadSize = min(payload is null ? 0 : payload.remaining(), capacity); 136 137 if (cast(Transfer)frameBody !is null) 138 { 139 version(HUNT_AMQP_DEBUG) tracef("Transfer size: %d ---%d ----%d", performativeSize, capacity , payloadSize); 140 } 141 142 if (transport.isFrameTracingEnabled()) { 143 logFrame(channel, frameBody, payload, payloadSize); 144 } 145 146 if (payloadSize > 0) { 147 int oldLimit = payload.limit(); 148 payload.limit(payload.position() + payloadSize); 149 frameBuffer.put(payload); 150 payload.limit(oldLimit); 151 } 152 153 endFrame(channel); 154 155 framesOutput++; 156 } 157 158 private int writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) { 159 frameBuffer.position(frameStart + FRAME_HEADER_SIZE); 160 161 if (frameBody !is null) { 162 encoder.writeObject(frameBody); 163 } 164 165 int performativeSize = frameBuffer.position() - frameStart; 166 167 if (onPayloadTooLarge !is null && maxFrameSize > 0 && payload !is null && (payload.remaining() + performativeSize) > maxFrameSize) { 168 // Next iteration will re-encode the frame body again with updates from the <payload-to-large> 169 // handler and then we can move onto the body portion. 170 onPayloadTooLarge.run(); 171 performativeSize = writePerformative(frameBody, payload, null); 172 } 173 174 return performativeSize; 175 } 176 177 private void endFrame(int channel) { 178 int frameSize = frameBuffer.position() - frameStart; 179 int originalPosition = frameBuffer.position(); 180 181 frameBuffer.position(frameStart); 182 frameBuffer.putInt(frameSize); 183 frameBuffer.put(cast(byte) 2); 184 frameBuffer.put(frameType); 185 frameBuffer.putShort(cast(short) channel); 186 frameBuffer.position(originalPosition); 187 } 188 189 private void logFrame(int channel, Object frameBody, ReadableBuffer payload, int payloadSize) { 190 ProtocolTracer tracer = transport.getProtocolTracer(); 191 if (frameType == AMQP_FRAME_TYPE) { 192 ReadableBuffer originalPayload = null; 193 if (payload !is null) { 194 originalPayload = payload.slice(); 195 originalPayload.limit(payloadSize); 196 } 197 198 Binary payloadBin = Binary.create(originalPayload); 199 FrameBody bd = null; 200 if (frameBody is null) { 201 bd = EmptyFrame.INSTANCE; 202 } else { 203 bd = cast(FrameBody) frameBody; 204 } 205 206 TransportFrame frame = new TransportFrame(channel, bd, payloadBin); 207 208 transport.log(TransportImpl.OUTGOING, frame); 209 210 if (tracer !is null) { 211 tracer.sentFrame(frame); 212 } 213 } else { 214 SaslFrameBody bd = cast(SaslFrameBody) frameBody; 215 transport.log(TransportImpl.OUTGOING, bd); 216 if (tracer !is null) { 217 tracer.sentSaslBody(bd); 218 } 219 } 220 } 221 }