1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.FrameWriter;
13 
14 import hunt.io.ByteBuffer;
15 
16 import hunt.proton.amqp.Binary;
17 import hunt.proton.amqp.security.SaslFrameBody;
18 import hunt.proton.amqp.transport.EmptyFrame;
19 import hunt.proton.amqp.transport.FrameBody;
20 import hunt.proton.codec.EncoderImpl;
21 import hunt.proton.codec.ReadableBuffer;
22 import hunt.proton.framing.TransportFrame;
23 import hunt.proton.engine.impl.TransportImpl;
24 import hunt.proton.engine.impl.FrameWriterBuffer;
25 import hunt.util.Common;
26 import std.algorithm;
27 import hunt.proton.engine.impl.ProtocolTracer;
28 import  hunt.proton.amqp.transport.Begin;
29 import  hunt.proton.amqp.transport.Open;
30 import  hunt.proton.amqp.transport.Attach;
31 import  hunt.proton.amqp.transport.Transfer;
32 import hunt.logging;
33 import hunt.util.Runnable;
34 
35 /**
36  * Writes Frames to an internal buffer for later processing by the transport.
37  */
38 class FrameWriter {
39 
40     static int DEFAULT_FRAME_BUFFER_FULL_MARK = 64 * 1024;
41     static int FRAME_HEADER_SIZE = 8;
42 
43     static byte AMQP_FRAME_TYPE = 0;
44     static byte SASL_FRAME_TYPE = 1;
45 
46     private TransportImpl transport;
47     private EncoderImpl encoder;
48     private FrameWriterBuffer frameBuffer ;//= new FrameWriterBuffer();
49 
50     // Configuration of this Frame Writer
51     private int maxFrameSize;
52     private byte frameType;
53     private int frameBufferMaxBytes; //= DEFAULT_FRAME_BUFFER_FULL_MARK;
54 
55     // State of current write operation, reset on start of each new write
56     private int frameStart;
57 
58     // Frame Writer metrics
59     private long framesOutput;
60 
61     this(EncoderImpl encoder, int maxFrameSize, byte frameType, TransportImpl transport) {
62         this.encoder = encoder;
63         this.maxFrameSize = maxFrameSize;
64         this.frameType = frameType;
65         this.transport = transport;
66         this.frameBufferMaxBytes = DEFAULT_FRAME_BUFFER_FULL_MARK;
67         this.frameBuffer = new FrameWriterBuffer();
68         encoder.setByteBuffer(frameBuffer);
69     }
70 
71     bool isFull() {
72         return frameBuffer.position() > frameBufferMaxBytes;
73     }
74 
75     int readBytes(ByteBuffer dst) {
76         return frameBuffer.transferTo(dst);
77     }
78 
79     long getFramesOutput() {
80         return framesOutput;
81     }
82 
83     void setMaxFrameSize(int maxFrameSize) {
84         this.maxFrameSize = maxFrameSize;
85     }
86 
87     void setFrameWriterMaxBytes(int maxBytes) {
88         this.frameBufferMaxBytes = maxBytes;
89     }
90 
91     int getFrameWriterMaxBytes() {
92         return frameBufferMaxBytes;
93     }
94 
95     void writeHeader(byte[] header) {
96         frameBuffer.put(header, 0, cast(int)header.length);
97     }
98 
99     void writeFrame(Object frameBody) {
100         writeFrame(0, frameBody, null, null);
101     }
102 
103     void writeFrame(int channel, Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
104         frameStart = frameBuffer.position();
105 
106         version(HUNT_AMQP_DEBUG) {
107             if (cast(Begin)frameBody !is null)
108             {
109                 logInfo(".");
110             }
111         }
112 
113         int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge);
114         if (cast(Begin)frameBody !is null)
115         {
116           version(HUNT_AMQP_DEBUG) logInfof("begin size: %d", performativeSize);
117         }
118         if (cast(Open)frameBody !is null)
119         {
120           version(HUNT_AMQP_DEBUG) logInfof("Open size: %d", performativeSize);
121         }
122 
123 
124         if (cast(Open)frameBody !is null)
125         {
126           version(HUNT_AMQP_DEBUG) tracef("Open size: %d", performativeSize);
127         }
128         if (cast(Attach)frameBody !is null)
129         {
130             Attach at = cast(Attach)frameBody;
131             version(HUNT_AMQP_DEBUG)  tracef ("%s", at.toString);
132         }
133 
134         int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : 2147483647;
135         int payloadSize = min(payload is null ? 0 : payload.remaining(), capacity);
136 
137         if (cast(Transfer)frameBody !is null)
138         {
139             version(HUNT_AMQP_DEBUG)  tracef("Transfer size: %d ---%d ----%d", performativeSize, capacity , payloadSize);
140         }
141 
142         if (transport.isFrameTracingEnabled()) {
143             logFrame(channel, frameBody, payload, payloadSize);
144         }
145 
146         if (payloadSize > 0) {
147             int oldLimit = payload.limit();
148             payload.limit(payload.position() + payloadSize);
149             frameBuffer.put(payload);
150             payload.limit(oldLimit);
151         }
152 
153         endFrame(channel);
154 
155         framesOutput++;
156     }
157 
158     private int writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) {
159         frameBuffer.position(frameStart + FRAME_HEADER_SIZE);
160 
161         if (frameBody !is null) {
162             encoder.writeObject(frameBody);
163         }
164 
165         int performativeSize = frameBuffer.position() - frameStart;
166 
167         if (onPayloadTooLarge !is null && maxFrameSize > 0 && payload !is null && (payload.remaining() + performativeSize) > maxFrameSize) {
168             // Next iteration will re-encode the frame body again with updates from the <payload-to-large>
169             // handler and then we can move onto the body portion.
170             onPayloadTooLarge.run();
171             performativeSize = writePerformative(frameBody, payload, null);
172         }
173 
174         return performativeSize;
175     }
176 
177     private void endFrame(int channel) {
178         int frameSize = frameBuffer.position() - frameStart;
179         int originalPosition = frameBuffer.position();
180 
181         frameBuffer.position(frameStart);
182         frameBuffer.putInt(frameSize);
183         frameBuffer.put(cast(byte) 2);
184         frameBuffer.put(frameType);
185         frameBuffer.putShort(cast(short) channel);
186         frameBuffer.position(originalPosition);
187     }
188 
189     private void logFrame(int channel, Object frameBody, ReadableBuffer payload, int payloadSize) {
190         ProtocolTracer tracer = transport.getProtocolTracer();
191         if (frameType == AMQP_FRAME_TYPE) {
192             ReadableBuffer originalPayload = null;
193             if (payload !is null) {
194                 originalPayload = payload.slice();
195                 originalPayload.limit(payloadSize);
196             }
197 
198             Binary payloadBin = Binary.create(originalPayload);
199             FrameBody bd = null;
200             if (frameBody is null) {
201                 bd = EmptyFrame.INSTANCE;
202             } else {
203                 bd = cast(FrameBody) frameBody;
204             }
205 
206             TransportFrame frame = new TransportFrame(channel, bd, payloadBin);
207 
208             transport.log(TransportImpl.OUTGOING, frame);
209 
210             if (tracer !is null) {
211                 tracer.sentFrame(frame);
212             }
213         } else {
214             SaslFrameBody bd = cast(SaslFrameBody) frameBody;
215             transport.log(TransportImpl.OUTGOING, bd);
216             if (tracer !is null) {
217                 tracer.sentSaslBody(bd);
218             }
219         }
220     }
221 }