1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.FrameParser;
13 
14 import  hunt.proton.engine.impl.AmqpHeader;
15 import  hunt.proton.engine.impl.ByteBufferUtils;
16 
17 import hunt.io.ByteBuffer;
18 import hunt.io.BufferUtils;
19 import hunt.proton.amqp.Binary;
20 import hunt.proton.amqp.transport.EmptyFrame;
21 import hunt.proton.amqp.transport.FrameBody;
22 import hunt.proton.codec.ByteBufferDecoder;
23 import hunt.proton.codec.DecodeException;
24 import hunt.proton.engine.Transport;
25 import hunt.proton.engine.TransportException;
26 import hunt.proton.framing.TransportFrame;
27 import hunt.proton.engine.impl.TransportInput;
28 import hunt.proton.engine.impl.FrameHandler;
29 import hunt.proton.engine.impl.TransportImpl;
30 import std.concurrency : initOnce;
31 import hunt.proton.engine.impl.ProtocolTracer;
32 import hunt.logging;
33 import hunt.String;
34 
35 class FrameParser : TransportInput
36 {
37     private static string HEADER_DESCRIPTION = "AMQP";
38 
39     //private static ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
40 
41 
42    static ByteBuffer  _emptyInputBuffer() {
43        __gshared ByteBuffer  inst;
44        return initOnce!inst(ByteBufferUtils.newWriteableBuffer(0));
45    }
46 
47     enum State
48     {
49         HEADER0,
50         HEADER1,
51         HEADER2,
52         HEADER3,
53         HEADER4,
54         HEADER5,
55         HEADER6,
56         HEADER7,
57         SIZE_0,
58         SIZE_1,
59         SIZE_2,
60         SIZE_3,
61         PRE_PARSE,
62         BUFFERING,
63         PARSING,
64         ERROR
65     }
66 
67     private FrameHandler _frameHandler;
68     private ByteBufferDecoder _decoder;
69     private int _inputBufferSize;
70     private int _localMaxFrameSize;
71     private TransportImpl _transport;
72 
73     private ByteBuffer _inputBuffer = null;
74     private bool _tail_closed = false;
75 
76     private State _state = State.HEADER0;
77 
78     private long _framesInput = 0;
79 
80     /** the stated size of the current frame */
81     private int _size;
82 
83     /** holds the current frame that is being parsed */
84     private ByteBuffer _frameBuffer;
85 
86     private TransportFrame _heldFrame;
87     private TransportException _parsingError;
88 
89 
90     /**
91      * We store the last result when processing input so that
92      * we know not to process any more input if it was an error.
93      */
94     this(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize, TransportImpl transport)
95     {
96         _frameHandler = frameHandler;
97         _decoder = decoder;
98         _localMaxFrameSize = localMaxFrameSize;
99         _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 16*1024;
100         _transport = transport;
101     }
102 
103     private void input(ByteBuffer inbuf)
104     {
105         //logError("inbuf : %s",inbuf.array());
106        // ByteBuffer tmp = inbuf;
107       //  logError("inbuf : %s",inbuf.getRemaining());
108         flushHeldFrame();
109         if (_heldFrame !is null)
110         {
111             return;
112         }
113 
114         TransportException frameParsingError = null;
115         int size = _size;
116         State state = _state;
117         ByteBuffer oldIn = null;
118 
119         bool transportAccepting = true;
120 
121         while(inbuf.hasRemaining() && state != State.ERROR && transportAccepting)
122         {
123             switch(state)
124             {
125                 case State.HEADER0:
126                     if(inbuf.hasRemaining())
127                     {
128                         byte c = inbuf.get();
129                         if(c != AmqpHeader.HEADER[0])
130                         {
131                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
132                             state = State.ERROR;
133                             break;
134                         }
135                         state = State.HEADER1;
136                         goto case;
137                     }
138                     else
139                     {
140                         break;
141                     }
142                 case State.HEADER1:
143                     if(inbuf.hasRemaining())
144                     {
145                         byte c = inbuf.get();
146                         if(c != AmqpHeader.HEADER[1])
147                         {
148                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
149                             state = State.ERROR;
150                             break;
151                         }
152                         state = State.HEADER2;
153                         goto case;
154                     }
155                     else
156                     {
157                         break;
158                     }
159                 case State.HEADER2:
160                     if(inbuf.hasRemaining())
161                     {
162                         byte c = inbuf.get();
163                         if(c != AmqpHeader.HEADER[2])
164                         {
165                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
166                             state = State.ERROR;
167                             break;
168                         }
169                         state = State.HEADER3;
170                         goto case;
171                     }
172                     else
173                     {
174                         break;
175                     }
176                 case State.HEADER3:
177                     if(inbuf.hasRemaining())
178                     {
179                         byte c = inbuf.get();
180                         if(c != AmqpHeader.HEADER[3])
181                         {
182                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
183                             state = State.ERROR;
184                             break;
185                         }
186                         state = State.HEADER4;
187                         goto case;
188                     }
189                     else
190                     {
191                         break;
192                     }
193                 case State.HEADER4:
194                     if(inbuf.hasRemaining())
195                     {
196                         byte c = inbuf.get();
197                         if(c != AmqpHeader.HEADER[4])
198                         {
199                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
200                             state = State.ERROR;
201                             break;
202                         }
203                         state = State.HEADER5;
204                         goto case;
205                     }
206                     else
207                     {
208                         break;
209                     }
210                 case State.HEADER5:
211                     if(inbuf.hasRemaining())
212                     {
213                         byte c = inbuf.get();
214                         if(c != AmqpHeader.HEADER[5])
215                         {
216                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
217                             state = State.ERROR;
218                             break;
219                         }
220                         state = State.HEADER6;
221                         goto case;
222                     }
223                     else
224                     {
225                         break;
226                     }
227                 case State.HEADER6:
228                     if(inbuf.hasRemaining())
229                     {
230                         byte c = inbuf.get();
231                         if(c != AmqpHeader.HEADER[6])
232                         {
233                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
234                             state = State.ERROR;
235                             break;
236                         }
237                         state = State.HEADER7;
238                         goto case;
239                     }
240                     else
241                     {
242                         break;
243                     }
244                 case State.HEADER7:
245                     if(inbuf.hasRemaining())
246                     {
247                         byte c = inbuf.get();
248                         if(c != AmqpHeader.HEADER[7])
249                         {
250                             frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s");
251                             state = State.ERROR;
252                             break;
253                         }
254 
255                         logHeader();
256 
257                         state = State.SIZE_0;
258                         goto case;
259                     }
260                     else
261                     {
262                         break;
263                     }
264                 case State.SIZE_0:
265                     if(!inbuf.hasRemaining())
266                     {
267                         break;
268                     }
269                     if(inbuf.remaining() >= 4)
270                     {
271                         size = inbuf.getInt();
272                         state = State.PRE_PARSE;
273                         break;
274                     }
275                     else
276                     {
277                         size = (inbuf.get() << 24) & 0xFF000000;
278                         if(!inbuf.hasRemaining())
279                         {
280                             state = State.SIZE_1;
281                             break;
282                         }
283                     }
284                     goto case;
285                 case State.SIZE_1:
286                     size |= (inbuf.get() << 16) & 0xFF0000;
287                     if(!inbuf.hasRemaining())
288                     {
289                         state = State.SIZE_2;
290                         break;
291                     }
292                     goto case;
293                 case State.SIZE_2:
294                     size |= (inbuf.get() << 8) & 0xFF00;
295                     if(!inbuf.hasRemaining())
296                     {
297                         state = State.SIZE_3;
298                         break;
299                     }
300                     goto case;
301                 case State.SIZE_3:
302                     size |= inbuf.get() & 0xFF;
303                     state = State.PRE_PARSE;
304                     goto case;
305                 case State.PRE_PARSE:
306                     if(size < 8)
307                     {
308                         frameParsingError = new TransportException("specified frame size %d smaller than minimum frame header ");
309                         state = State.ERROR;
310                         break;
311                     }
312 
313                     if (_localMaxFrameSize > 0 && size > _localMaxFrameSize)
314                     {
315                         frameParsingError = new TransportException("specified frame size %d greater than maximum valid frame size %d");
316                         state = State.ERROR;
317                         break;
318                     }
319 
320                     if(inbuf.remaining() < size-4)
321                     {
322                         _frameBuffer = BufferUtils.allocate(size-4);
323                         _frameBuffer.put(inbuf);
324                         state = State.BUFFERING;
325                         break;
326                     }
327                     goto case;
328                 case State.BUFFERING:
329                     if(_frameBuffer !is null)
330                     {
331                         if(inbuf.remaining() < _frameBuffer.remaining())
332                         {
333                             _frameBuffer.put(inbuf);
334                             break;
335                         }
336                         else
337                         {
338                             ByteBuffer dup = inbuf.duplicate();
339                             dup.limit(dup.position()+_frameBuffer.remaining());
340                             inbuf.position(inbuf.position()+_frameBuffer.remaining());
341                             _frameBuffer.put(dup);
342                             oldIn = inbuf;
343                             _frameBuffer.flip();
344                             inbuf = _frameBuffer;
345                             state = State.PARSING;
346                         }
347                     }
348                     goto case;
349                 case State.PARSING:
350 
351                     int dataOffset = (inbuf.get() << 2) & 0x3FF;
352 
353                     if(dataOffset < 8)
354                     {
355                         frameParsingError = new TransportException("specified frame data offset %d smaller than minimum frame header size ");
356                         state = State.ERROR;
357                         break;
358                     }
359                     else if(dataOffset > size)
360                     {
361                         frameParsingError = new TransportException("specified frame data offset %d larger than the frame size ");
362                         state = State.ERROR;
363                         break;
364                     }
365 
366                     // type
367 
368                     int type = inbuf.get() & 0xFF;
369                     int channel = inbuf.getShort() & 0xFFFF;
370 
371                     if(type != 0)
372                     {
373                         frameParsingError = new TransportException("unknown frame type");
374                         state = State.ERROR;
375                         break;
376                     }
377 
378                     // note that this skips over the extended header if it's present
379                     if(dataOffset!=8)
380                     {
381                         inbuf.position(inbuf.position()+dataOffset-8);
382                     }
383 
384                     // oldIn null iff not working on duplicated buffer
385                     int frameBodySize = size - dataOffset;
386                     if(oldIn is null)
387                     {
388                         oldIn = inbuf;
389                         inbuf = inbuf.duplicate();
390                         int endPos = inbuf.position() + frameBodySize;
391                         inbuf.limit(endPos);
392                         oldIn.position(endPos);
393 
394                     }
395 
396                     try
397                     {
398                         _framesInput += 1;
399 
400                         Binary payload = null;
401                         Object val = null;
402 
403                         if (frameBodySize > 0)
404                         {
405                             _decoder.setByteBuffer(inbuf);
406                             val = _decoder.readObject();
407                             _decoder.setByteBuffer(null);
408 
409                             if(inbuf.hasRemaining())
410                             {
411                                 byte[] payloadBytes = new byte[inbuf.remaining()];
412                                 inbuf.get(payloadBytes);
413                                 payload = new Binary(payloadBytes);
414                             }
415                             else
416                             {
417                                 payload = null;
418                             }
419                         }
420                         else
421                         {
422                             val = EmptyFrame.INSTANCE;
423                         }
424 
425                         FrameBody frameBody = cast(FrameBody) val;
426                         if(frameBody !is null)
427                         {
428                             version(HUNT_AMQP_DEBUG) {
429                                 tracef("IN: CH[%d] : %s, %s", channel, frameBody, 
430                                     payload is null ? "" : ", [" ~ payload.toString() ~ "]");
431                             }
432                             TransportFrame frame = new TransportFrame(channel, frameBody, payload);
433 
434                             if(_frameHandler.isHandlingFrames())
435                             {
436                                 _tail_closed = _frameHandler.handleFrame(frame);
437                             }
438                             else
439                             {
440                                 transportAccepting = false;
441                                 _heldFrame = frame;
442                             }
443                         }
444                         else
445                         {
446                             logError("Frameparser encountered a null");
447                             //throw new TransportException("Frameparser encountered a "
448                             //        + (val is null? "null" : val.getClass())
449                             //        + " which is not a " + FrameBody.class);
450                         }
451 
452                         reset();
453                         inbuf = oldIn;
454                         oldIn = null;
455                         _frameBuffer = null;
456                         state = State.SIZE_0;
457                     }
458                     catch (DecodeException ex)
459                     {
460                         state = State.ERROR;
461                         frameParsingError = new TransportException(ex);
462                     }
463                     break;
464                 case State.ERROR:
465                     break;
466                     // do nothing
467                 default:
468                     break;
469             }
470 
471         }
472 
473         if (_tail_closed)
474         {
475             if (inbuf.hasRemaining()) {
476                 state = State.ERROR;
477                 frameParsingError = new TransportException("framing error");
478             } else if (state != State.SIZE_0) {
479                 state = State.ERROR;
480                 frameParsingError = new TransportException("connection aborted");
481             } else {
482                 _frameHandler.closed(null);
483             }
484         }
485 
486         _state = state;
487         _size = size;
488 
489         if(_state == State.ERROR)
490         {
491             _tail_closed = true;
492             if(frameParsingError !is null)
493             {
494                 _parsingError = frameParsingError;
495                 _frameHandler.closed(frameParsingError);
496             }
497             else
498             {
499                 throw new TransportException("Unable to parse, probably because of a previous error");
500             }
501         }
502     }
503 
504     override
505     public int capacity()
506     {
507         if (_tail_closed) {
508             return Transport.END_OF_STREAM;
509         } else {
510             if (_inputBuffer !is null) {
511                 return _inputBuffer.remaining();
512             } else {
513                 return _inputBufferSize;
514             }
515         }
516     }
517 
518     override
519     public int position() {
520         if (_tail_closed) {
521             return Transport.END_OF_STREAM;
522         }
523         return (_inputBuffer is null) ? 0 : _inputBuffer.position();
524     }
525 
526     override
527     public ByteBuffer tail()
528     {
529         if (_tail_closed) {
530             throw new TransportException("tail closed");
531         }
532 
533         if (_inputBuffer is null) {
534             _inputBuffer = ByteBufferUtils.newWriteableBuffer(_inputBufferSize);
535         }
536 
537         return _inputBuffer;
538     }
539 
540     override
541     public void process()
542     {
543         if (_inputBuffer !is null)
544         {
545             _inputBuffer.flip();
546 
547             try
548             {
549                 input(_inputBuffer);
550             }
551             finally
552             {
553                 if (_inputBuffer.hasRemaining()) {
554                     _inputBuffer.compact();
555                 } else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
556                     _inputBuffer = null;
557                 } else {
558                     _inputBuffer.clear();
559                 }
560             }
561         }
562         else
563         {
564             input(_emptyInputBuffer);
565         }
566     }
567 
568     override
569     public void close_tail()
570     {
571         _tail_closed = true;
572         process();
573     }
574 
575     /**
576      * Attempt to flush any cached data to the frame transport.  This function
577      * is useful if the {@link FrameHandler} state has changed.
578      */
579     public void flush()
580     {
581         flushHeldFrame();
582 
583         if (_heldFrame is null)
584         {
585             process();
586         }
587     }
588 
589     private void flushHeldFrame()
590     {
591         if(_heldFrame !is null && _frameHandler.isHandlingFrames())
592         {
593             _tail_closed = _frameHandler.handleFrame(_heldFrame);
594             _heldFrame = null;
595         }
596     }
597 
598     private void reset()
599     {
600         _size = 0;
601         _state = State.SIZE_0;
602     }
603 
604     long getFramesInput()
605     {
606         return _framesInput;
607     }
608 
609     private void logHeader() {
610         if (_transport.isFrameTracingEnabled()) {
611             _transport.log(TransportImpl.INCOMING, new String(HEADER_DESCRIPTION));
612 
613             ProtocolTracer tracer = _transport.getProtocolTracer();
614             if (tracer !is null) {
615                 tracer.receivedHeader(HEADER_DESCRIPTION);
616             }
617         }
618     }
619 }