1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.proton.codec.transport.FastPathFlowType;
12 
13 import hunt.collection.Collection;
14 
15 import hunt.proton.amqp.Symbol;
16 import hunt.proton.amqp.UnsignedLong;
17 import hunt.proton.amqp.transport.Flow;
18 import hunt.proton.codec.AMQPType;
19 import hunt.proton.codec.DecodeException;
20 import hunt.proton.codec.Decoder;
21 import hunt.proton.codec.DecoderImpl;
22 import hunt.proton.codec.EncoderImpl;
23 import hunt.proton.codec.EncodingCodes;
24 import hunt.proton.codec.FastPathDescribedTypeConstructor;
25 import hunt.proton.codec.TypeEncoding;
26 import hunt.proton.codec.WritableBuffer;
27 import  hunt.proton.codec.transport.FlowType;
28 import hunt.Object;
29 import std.concurrency : initOnce;
30 import hunt.Exceptions;
31 import hunt.logging;
32 import hunt.Boolean;
33 import std.conv : to;
34 
35 class FastPathFlowType : AMQPType!(Flow), FastPathDescribedTypeConstructor!(Flow) {
36 
37     private static byte DESCRIPTOR_CODE = 0x13;
38 
39     //private static Object[] DESCRIPTORS =
40     //{
41     //    UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:flow:list"),
42     //};
43   static Object[]  DESCRIPTORS() {
44       __gshared Object[]  inst;
45       return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:flow:list")]);
46   }
47 
48 
49     private FlowType flowType;
50 
51     this(EncoderImpl encoder) {
52         this.flowType = new FlowType(encoder);
53     }
54 
55     public EncoderImpl getEncoder() {
56         return flowType.getEncoder();
57     }
58 
59     public DecoderImpl getDecoder() {
60         return flowType.getDecoder();
61     }
62 
63     override
64     public bool encodesJavaPrimitive() {
65         return false;
66     }
67 
68     override
69     public TypeInfo getTypeClass() {
70         return typeid(Flow);
71     }
72 
73     override
74     public ITypeEncoding getEncoding(Object flow) {
75         return flowType.getEncoding(cast(Flow)flow);
76     }
77 
78     override
79     public TypeEncoding!(Flow) getCanonicalEncoding() {
80         return flowType.getCanonicalEncoding();
81     }
82 
83     override
84     public Collection!(TypeEncoding!(Flow)) getAllEncodings() {
85         return flowType.getAllEncodings();
86     }
87 
88     override
89     public Flow readValue() {
90         DecoderImpl decoder = getDecoder();
91         byte typeCode = decoder.getBuffer().get();
92 
93         int size = 0;
94         int count = 0;
95 
96         switch (typeCode) {
97             case EncodingCodes.LIST0:
98                 // TODO - Technically invalid however old decoder also allowed this.
99                 break;
100             case EncodingCodes.LIST8:
101                 size = (cast(int)decoder.getBuffer().get()) & 0xff;
102                 count = (cast(int)decoder.getBuffer().get()) & 0xff;
103                 break;
104             case EncodingCodes.LIST32:
105                 size = decoder.getBuffer().getInt();
106                 count = decoder.getBuffer().getInt();
107                 break;
108             default:
109             {
110                 logError("Incorrect type found in Flow encoding %d", typeCode);
111                 break;
112             }
113                 //throw new DecodeException("Incorrect type found in Flow encoding: " ~ typeCode);
114         }
115 
116         Flow flow = new Flow();
117 
118         for (int index = 0; index < count; ++index) {
119             switch (index) {
120                 case 0:
121                     flow.setNextIncomingId(decoder.readUnsignedInteger(null));
122                     break;
123                 case 1:
124                     flow.setIncomingWindow(decoder.readUnsignedInteger(null));
125                     break;
126                 case 2:
127                     flow.setNextOutgoingId(decoder.readUnsignedInteger(null));
128                     break;
129                 case 3:
130                     flow.setOutgoingWindow(decoder.readUnsignedInteger(null));
131                     break;
132                 case 4:
133                     flow.setHandle(decoder.readUnsignedInteger(null));
134                     break;
135                 case 5:
136                     flow.setDeliveryCount(decoder.readUnsignedInteger(null));
137                     break;
138                 case 6:
139                     flow.setLinkCredit(decoder.readUnsignedInteger(null));
140                     break;
141                 case 7:
142                     flow.setAvailable(decoder.readUnsignedInteger(null));
143                     break;
144                 case 8:
145                     flow.setDrain(new Boolean(decoder.readBoolean(false)));
146                     break;
147                 case 9:
148                     flow.setEcho(new Boolean( decoder.readBoolean(false)));
149                     break;
150                 case 10:
151                     //implementationMissing(false);
152                     flow.setProperties(cast(IObject)decoder.readMap());
153                     break;
154                 default:
155                     throw new IllegalStateException("To many entries in Flow encoding");
156             }
157         }
158 
159         return flow;
160     }
161 
162     override
163     public void skipValue() {
164         getDecoder().readConstructor().skipValue();
165     }
166 
167     override
168     public void write(Object v) {
169 
170         Flow flow = cast(Flow)v;
171 
172         WritableBuffer buffer = getEncoder().getBuffer();
173         int count = getElementCount(flow);
174         byte encodingCode = deduceEncodingCode(flow, count);
175 
176         buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR);
177         buffer.put(EncodingCodes.SMALLULONG);
178         buffer.put(DESCRIPTOR_CODE);
179         buffer.put(encodingCode);
180 
181         int fieldWidth;
182 
183         if (encodingCode == EncodingCodes.LIST8) {
184             fieldWidth = 1;
185         } else {
186             fieldWidth = 4;
187         }
188 
189         int startIndex = buffer.position();
190 
191         // Reserve space for the size and write the count of list elements.
192         if (fieldWidth == 1) {
193             buffer.put(cast(byte) 0);
194             buffer.put(cast(byte) count);
195         } else {
196             buffer.putInt(0);
197             buffer.putInt(count);
198         }
199 
200         // Write the list elements and then compute total size written.
201         for (int i = 0; i < count; ++i) {
202             writeElement(flow, i);
203         }
204 
205         // Move back and write the size
206         int endIndex = buffer.position();
207         int writeSize = endIndex - startIndex - fieldWidth;
208 
209         buffer.position(startIndex);
210         if (fieldWidth == 1) {
211             buffer.put(cast(byte) writeSize);
212         } else {
213             buffer.putInt(writeSize);
214         }
215         buffer.position(endIndex);
216     }
217 
218     private void writeElement(Flow flow, int index) {
219         switch (index) {
220             case 0:
221                 getEncoder().writeUnsignedInteger(flow.getNextIncomingId());
222                 break;
223             case 1:
224                 getEncoder().writeUnsignedInteger(flow.getIncomingWindow());
225                 break;
226             case 2:
227                 getEncoder().writeUnsignedInteger(flow.getNextOutgoingId());
228                 break;
229             case 3:
230                 getEncoder().writeUnsignedInteger(flow.getOutgoingWindow());
231                 break;
232             case 4:
233                 getEncoder().writeUnsignedInteger(flow.getHandle());
234                 break;
235             case 5:
236                 getEncoder().writeUnsignedInteger(flow.getDeliveryCount());
237                 break;
238             case 6:
239                 getEncoder().writeUnsignedInteger(flow.getLinkCredit());
240                 break;
241             case 7:
242                 getEncoder().writeUnsignedInteger(flow.getAvailable());
243                 break;
244             case 8:
245                 getEncoder().writeBoolean(flow.getDrain());
246                 break;
247             case 9:
248                 getEncoder().writeBoolean(flow.getEcho());
249                 break;
250             case 10:
251                 implementationMissing( false);
252                 //getEncoder().writeMap(flow.getProperties());
253                 break;
254             default:
255                 throw new IllegalArgumentException("Unknown Flow value index: " ~ to!string (index));
256         }
257     }
258 
259     private int getElementCount(Flow flow) {
260         if (flow.getProperties() !is null) {
261             return 11;
262         } else if (flow.getEcho().booleanValue()) {
263             return 10;
264         } else if (flow.getDrain().booleanValue()) {
265             return 9;
266         } else if (flow.getAvailable() !is null) {
267             return 8;
268         } else if (flow.getLinkCredit() !is null) {
269             return 7;
270         } else if (flow.getDeliveryCount() !is null) {
271             return 6;
272         } else if (flow.getHandle() !is null) {
273             return 5;
274         } else {
275             return 4;
276         }
277     }
278 
279     private byte deduceEncodingCode(Flow value, int elementCount) {
280         if (value.getProperties() is null) {
281             return EncodingCodes.LIST8;
282         } else {
283             return EncodingCodes.LIST32;
284         }
285     }
286 
287     public static void register(Decoder decoder, EncoderImpl encoder) {
288         FastPathFlowType type = new FastPathFlowType(encoder);
289         foreach(Object descriptor ; DESCRIPTORS)
290         {
291             decoder.registerFastPath(descriptor, type);
292         }
293         encoder.register(type);
294     }
295 }