1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.proton.codec.transport.FastPathFlowType; 12 13 import hunt.collection.Collection; 14 15 import hunt.proton.amqp.Symbol; 16 import hunt.proton.amqp.UnsignedLong; 17 import hunt.proton.amqp.transport.Flow; 18 import hunt.proton.codec.AMQPType; 19 import hunt.proton.codec.DecodeException; 20 import hunt.proton.codec.Decoder; 21 import hunt.proton.codec.DecoderImpl; 22 import hunt.proton.codec.EncoderImpl; 23 import hunt.proton.codec.EncodingCodes; 24 import hunt.proton.codec.FastPathDescribedTypeConstructor; 25 import hunt.proton.codec.TypeEncoding; 26 import hunt.proton.codec.WritableBuffer; 27 import hunt.proton.codec.transport.FlowType; 28 import hunt.Object; 29 import std.concurrency : initOnce; 30 import hunt.Exceptions; 31 import hunt.logging; 32 import hunt.Boolean; 33 import std.conv : to; 34 35 class FastPathFlowType : AMQPType!(Flow), FastPathDescribedTypeConstructor!(Flow) { 36 37 private static byte DESCRIPTOR_CODE = 0x13; 38 39 //private static Object[] DESCRIPTORS = 40 //{ 41 // UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:flow:list"), 42 //}; 43 static Object[] DESCRIPTORS() { 44 __gshared Object[] inst; 45 return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:flow:list")]); 46 } 47 48 49 private FlowType flowType; 50 51 this(EncoderImpl encoder) { 52 this.flowType = new FlowType(encoder); 53 } 54 55 public EncoderImpl getEncoder() { 56 return flowType.getEncoder(); 57 } 58 59 public DecoderImpl getDecoder() { 60 return flowType.getDecoder(); 61 } 62 63 override 64 public bool encodesJavaPrimitive() { 65 return false; 66 } 67 68 override 69 public TypeInfo getTypeClass() { 70 return typeid(Flow); 71 } 72 73 override 74 public ITypeEncoding getEncoding(Object flow) { 75 return flowType.getEncoding(cast(Flow)flow); 76 } 77 78 override 79 public TypeEncoding!(Flow) getCanonicalEncoding() { 80 return flowType.getCanonicalEncoding(); 81 } 82 83 override 84 public Collection!(TypeEncoding!(Flow)) getAllEncodings() { 85 return flowType.getAllEncodings(); 86 } 87 88 override 89 public Flow readValue() { 90 DecoderImpl decoder = getDecoder(); 91 byte typeCode = decoder.getBuffer().get(); 92 93 int size = 0; 94 int count = 0; 95 96 switch (typeCode) { 97 case EncodingCodes.LIST0: 98 // TODO - Technically invalid however old decoder also allowed this. 99 break; 100 case EncodingCodes.LIST8: 101 size = (cast(int)decoder.getBuffer().get()) & 0xff; 102 count = (cast(int)decoder.getBuffer().get()) & 0xff; 103 break; 104 case EncodingCodes.LIST32: 105 size = decoder.getBuffer().getInt(); 106 count = decoder.getBuffer().getInt(); 107 break; 108 default: 109 { 110 logError("Incorrect type found in Flow encoding %d", typeCode); 111 break; 112 } 113 //throw new DecodeException("Incorrect type found in Flow encoding: " ~ typeCode); 114 } 115 116 Flow flow = new Flow(); 117 118 for (int index = 0; index < count; ++index) { 119 switch (index) { 120 case 0: 121 flow.setNextIncomingId(decoder.readUnsignedInteger(null)); 122 break; 123 case 1: 124 flow.setIncomingWindow(decoder.readUnsignedInteger(null)); 125 break; 126 case 2: 127 flow.setNextOutgoingId(decoder.readUnsignedInteger(null)); 128 break; 129 case 3: 130 flow.setOutgoingWindow(decoder.readUnsignedInteger(null)); 131 break; 132 case 4: 133 flow.setHandle(decoder.readUnsignedInteger(null)); 134 break; 135 case 5: 136 flow.setDeliveryCount(decoder.readUnsignedInteger(null)); 137 break; 138 case 6: 139 flow.setLinkCredit(decoder.readUnsignedInteger(null)); 140 break; 141 case 7: 142 flow.setAvailable(decoder.readUnsignedInteger(null)); 143 break; 144 case 8: 145 flow.setDrain(new Boolean(decoder.readBoolean(false))); 146 break; 147 case 9: 148 flow.setEcho(new Boolean( decoder.readBoolean(false))); 149 break; 150 case 10: 151 //implementationMissing(false); 152 flow.setProperties(cast(IObject)decoder.readMap()); 153 break; 154 default: 155 throw new IllegalStateException("To many entries in Flow encoding"); 156 } 157 } 158 159 return flow; 160 } 161 162 override 163 public void skipValue() { 164 getDecoder().readConstructor().skipValue(); 165 } 166 167 override 168 public void write(Object v) { 169 170 Flow flow = cast(Flow)v; 171 172 WritableBuffer buffer = getEncoder().getBuffer(); 173 int count = getElementCount(flow); 174 byte encodingCode = deduceEncodingCode(flow, count); 175 176 buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR); 177 buffer.put(EncodingCodes.SMALLULONG); 178 buffer.put(DESCRIPTOR_CODE); 179 buffer.put(encodingCode); 180 181 int fieldWidth; 182 183 if (encodingCode == EncodingCodes.LIST8) { 184 fieldWidth = 1; 185 } else { 186 fieldWidth = 4; 187 } 188 189 int startIndex = buffer.position(); 190 191 // Reserve space for the size and write the count of list elements. 192 if (fieldWidth == 1) { 193 buffer.put(cast(byte) 0); 194 buffer.put(cast(byte) count); 195 } else { 196 buffer.putInt(0); 197 buffer.putInt(count); 198 } 199 200 // Write the list elements and then compute total size written. 201 for (int i = 0; i < count; ++i) { 202 writeElement(flow, i); 203 } 204 205 // Move back and write the size 206 int endIndex = buffer.position(); 207 int writeSize = endIndex - startIndex - fieldWidth; 208 209 buffer.position(startIndex); 210 if (fieldWidth == 1) { 211 buffer.put(cast(byte) writeSize); 212 } else { 213 buffer.putInt(writeSize); 214 } 215 buffer.position(endIndex); 216 } 217 218 private void writeElement(Flow flow, int index) { 219 switch (index) { 220 case 0: 221 getEncoder().writeUnsignedInteger(flow.getNextIncomingId()); 222 break; 223 case 1: 224 getEncoder().writeUnsignedInteger(flow.getIncomingWindow()); 225 break; 226 case 2: 227 getEncoder().writeUnsignedInteger(flow.getNextOutgoingId()); 228 break; 229 case 3: 230 getEncoder().writeUnsignedInteger(flow.getOutgoingWindow()); 231 break; 232 case 4: 233 getEncoder().writeUnsignedInteger(flow.getHandle()); 234 break; 235 case 5: 236 getEncoder().writeUnsignedInteger(flow.getDeliveryCount()); 237 break; 238 case 6: 239 getEncoder().writeUnsignedInteger(flow.getLinkCredit()); 240 break; 241 case 7: 242 getEncoder().writeUnsignedInteger(flow.getAvailable()); 243 break; 244 case 8: 245 getEncoder().writeBoolean(flow.getDrain()); 246 break; 247 case 9: 248 getEncoder().writeBoolean(flow.getEcho()); 249 break; 250 case 10: 251 implementationMissing( false); 252 //getEncoder().writeMap(flow.getProperties()); 253 break; 254 default: 255 throw new IllegalArgumentException("Unknown Flow value index: " ~ to!string (index)); 256 } 257 } 258 259 private int getElementCount(Flow flow) { 260 if (flow.getProperties() !is null) { 261 return 11; 262 } else if (flow.getEcho().booleanValue()) { 263 return 10; 264 } else if (flow.getDrain().booleanValue()) { 265 return 9; 266 } else if (flow.getAvailable() !is null) { 267 return 8; 268 } else if (flow.getLinkCredit() !is null) { 269 return 7; 270 } else if (flow.getDeliveryCount() !is null) { 271 return 6; 272 } else if (flow.getHandle() !is null) { 273 return 5; 274 } else { 275 return 4; 276 } 277 } 278 279 private byte deduceEncodingCode(Flow value, int elementCount) { 280 if (value.getProperties() is null) { 281 return EncodingCodes.LIST8; 282 } else { 283 return EncodingCodes.LIST32; 284 } 285 } 286 287 public static void register(Decoder decoder, EncoderImpl encoder) { 288 FastPathFlowType type = new FastPathFlowType(encoder); 289 foreach(Object descriptor ; DESCRIPTORS) 290 { 291 decoder.registerFastPath(descriptor, type); 292 } 293 encoder.register(type); 294 } 295 }