1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.proton.codec.messaging.FastPathHeaderType; 12 13 import hunt.collection.Collection; 14 15 import hunt.proton.amqp.Symbol; 16 import hunt.proton.amqp.UnsignedLong; 17 import hunt.proton.amqp.messaging.Header; 18 import hunt.proton.codec.AMQPType; 19 import hunt.proton.codec.DecodeException; 20 import hunt.proton.codec.Decoder; 21 import hunt.proton.codec.DecoderImpl; 22 import hunt.proton.codec.EncoderImpl; 23 import hunt.proton.codec.EncodingCodes; 24 import hunt.proton.codec.FastPathDescribedTypeConstructor; 25 import hunt.proton.codec.ReadableBuffer; 26 import hunt.proton.codec.TypeEncoding; 27 import hunt.proton.codec.WritableBuffer; 28 import hunt.Exceptions; 29 30 import hunt.proton.codec.messaging.HeaderType; 31 import std.concurrency : initOnce; 32 import hunt.logging; 33 import std.conv : to; 34 35 class FastPathHeaderType : AMQPType!(Header), FastPathDescribedTypeConstructor!(Header) { 36 37 private static byte DESCRIPTOR_CODE = 0x70; 38 39 //private static Object[] DESCRIPTORS = 40 //{ 41 // UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:header:list"), 42 //}; 43 44 45 static Object[] DESCRIPTORS() { 46 __gshared Object[] inst; 47 return initOnce!inst([UnsignedLong.valueOf(DESCRIPTOR_CODE), Symbol.valueOf("amqp:header:list")]); 48 } 49 50 51 private HeaderType headerType; 52 53 this(EncoderImpl encoder) { 54 this.headerType = new HeaderType(encoder); 55 } 56 57 public EncoderImpl getEncoder() { 58 return headerType.getEncoder(); 59 } 60 61 public DecoderImpl getDecoder() { 62 return headerType.getDecoder(); 63 } 64 65 override 66 public Header readValue() { 67 DecoderImpl decoder = getDecoder(); 68 ReadableBuffer buffer = decoder.getBuffer(); 69 byte typeCode = decoder.getBuffer().get(); 70 71 int size = 0; 72 int count = 0; 73 74 switch (typeCode) { 75 case EncodingCodes.LIST0: 76 break; 77 case EncodingCodes.LIST8: 78 size = buffer.get() & 0xff; 79 count = buffer.get() & 0xff; 80 break; 81 case EncodingCodes.LIST32: 82 size = buffer.getInt(); 83 count = buffer.getInt(); 84 break; 85 default: 86 { 87 logError("Incorrect type found in Header encoding: %d",typeCode); 88 break; 89 } 90 // throw new DecodeException("Incorrect type found in Header encoding: " ~ typeCode); 91 } 92 93 Header header = new Header(); 94 95 for (int index = 0; index < count; ++index) { 96 switch (index) { 97 case 0: 98 header.setDurable(decoder.readBoolean(null)); 99 break; 100 case 1: 101 header.setPriority(decoder.readUnsignedByte(null)); 102 break; 103 case 2: 104 header.setTtl(decoder.readUnsignedInteger(null)); 105 break; 106 case 3: 107 header.setFirstAcquirer(decoder.readBoolean(null)); 108 break; 109 case 4: 110 header.setDeliveryCount(decoder.readUnsignedInteger(null)); 111 break; 112 default: 113 throw new IllegalStateException("To many entries in Header encoding"); 114 } 115 } 116 117 return header; 118 } 119 120 override 121 public void skipValue() { 122 //implementationMissing(false); 123 getDecoder().readConstructor().skipValue(); 124 } 125 126 override 127 public bool encodesJavaPrimitive() { 128 return false; 129 } 130 131 override 132 public TypeInfo getTypeClass() { 133 return typeid(Header); 134 } 135 136 override 137 public ITypeEncoding getEncoding(Object header) { 138 return headerType.getEncoding(cast(Header)header); 139 } 140 141 override 142 public TypeEncoding!(Header) getCanonicalEncoding() { 143 return headerType.getCanonicalEncoding(); 144 } 145 146 override 147 public Collection!(TypeEncoding!(Header)) getAllEncodings() { 148 return headerType.getAllEncodings(); 149 } 150 151 override 152 public void write(Object v) { 153 Header value = cast(Header)v; 154 WritableBuffer buffer = getEncoder().getBuffer(); 155 int count = getElementCount(value); 156 byte encodingCode = deduceEncodingCode(value, count); 157 158 buffer.put(EncodingCodes.DESCRIBED_TYPE_INDICATOR); 159 buffer.put(EncodingCodes.SMALLULONG); 160 buffer.put(DESCRIPTOR_CODE); 161 buffer.put(encodingCode); 162 163 // Optimized step, no other data to be written. 164 if (encodingCode == EncodingCodes.LIST0) { 165 return; 166 } 167 168 int fieldWidth; 169 170 if (encodingCode == EncodingCodes.LIST8) { 171 fieldWidth = 1; 172 } else { 173 fieldWidth = 4; 174 } 175 176 int startIndex = buffer.position(); 177 178 // Reserve space for the size and write the count of list elements. 179 if (fieldWidth == 1) { 180 buffer.put(cast(byte) 0); 181 buffer.put(cast(byte) count); 182 } else { 183 buffer.putInt(0); 184 buffer.putInt(count); 185 } 186 187 // Write the list elements and then compute total size written. 188 for (int i = 0; i < count; ++i) { 189 writeElement(value, i); 190 } 191 192 // Move back and write the size 193 int endIndex = buffer.position(); 194 int writeSize = endIndex - startIndex - fieldWidth; 195 196 buffer.position(startIndex); 197 if (fieldWidth == 1) { 198 buffer.put(cast(byte) writeSize); 199 } else { 200 buffer.putInt(writeSize); 201 } 202 buffer.position(endIndex); 203 } 204 205 private void writeElement(Header header, int index) { 206 switch (index) { 207 case 0: 208 getEncoder().writeBoolean(header.getDurable()); 209 break; 210 case 1: 211 getEncoder().writeUnsignedByte(header.getPriority()); 212 break; 213 case 2: 214 getEncoder().writeUnsignedInteger(header.getTtl()); 215 break; 216 case 3: 217 getEncoder().writeBoolean(header.getFirstAcquirer()); 218 break; 219 case 4: 220 getEncoder().writeUnsignedInteger(header.getDeliveryCount()); 221 break; 222 default: 223 throw new IllegalArgumentException("Unknown Header value index: " ~ to!string(index)); 224 } 225 } 226 227 private int getElementCount(Header header) { 228 if (header.getDeliveryCount() !is null) { 229 return 5; 230 } else if (header.getFirstAcquirer() !is null) { 231 return 4; 232 } else if (header.getTtl() !is null) { 233 return 3; 234 } else if (header.getPriority() !is null) { 235 return 2; 236 } else if (header.getDurable() !is null) { 237 return 1; 238 } else { 239 return 0; 240 } 241 } 242 243 private byte deduceEncodingCode(Header value, int elementCount) { 244 if (elementCount == 0) { 245 return EncodingCodes.LIST0; 246 } else { 247 return EncodingCodes.LIST8; 248 } 249 } 250 251 public static void register(Decoder decoder, EncoderImpl encoder) { 252 FastPathHeaderType type = new FastPathHeaderType(encoder); 253 //implementationMissing(false); 254 foreach(Object descriptor ; DESCRIPTORS) 255 { 256 decoder.registerFastPath(descriptor, type); 257 } 258 encoder.register(type); 259 } 260 }