1 /*
2  * hunt-proton: AMQP Protocol library for D programming language.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net/
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module hunt.proton.engine.impl.DeliveryImpl;
13 
14 
15 import hunt.proton.amqp.Binary;
16 import hunt.proton.amqp.transport.DeliveryState;
17 import hunt.proton.codec.CompositeReadableBuffer;
18 import hunt.proton.codec.ReadableBuffer;
19 import hunt.proton.codec.WritableBuffer;
20 import hunt.proton.engine.Delivery;
21 import hunt.proton.engine.Record;
22 import hunt.proton.engine.Transport;
23 import hunt.proton.engine.impl.LinkImpl;
24 import hunt.proton.engine.impl.TransportDelivery;
25 import hunt.proton.engine.impl.SenderImpl;
26 import hunt.proton.engine.impl.ReceiverImpl;
27 import std.algorithm;
28 import hunt.proton.engine.impl.RecordImpl;
29 import std.concurrency : initOnce;
30 
31 
32 
33 class DeliveryImpl : Delivery
34 {
35     public static int DEFAULT_MESSAGE_FORMAT = 0;
36 
37    // private static ReadableBuffer EMPTY_BUFFER = ByteBufferReader.allocate(0);
38 
39     static ReadableBuffer EMPTY_BUFFER()
40     {
41         __gshared ReadableBuffer inst;
42         return initOnce!inst(ByteBufferReader.allocate(0));
43     }
44 
45     private DeliveryImpl _linkPrevious;
46     private DeliveryImpl _linkNext;
47 
48     private DeliveryImpl _workNext;
49     private DeliveryImpl _workPrev;
50     bool _work;
51 
52     private DeliveryImpl _transportWorkNext;
53     private DeliveryImpl _transportWorkPrev;
54     bool _transportWork;
55 
56     private Record _attachments;
57     private Object _context;
58 
59     private byte[] _tag;
60     private LinkImpl _link;
61     private DeliveryState _deliveryState;
62     private bool _settled;
63     private bool _remoteSettled;
64     private DeliveryState _remoteDeliveryState;
65     private DeliveryState _defaultDeliveryState = null;
66     private int _messageFormat ; //= DEFAULT_MESSAGE_FORMAT;
67 
68     /**
69      * A bit-mask representing the outstanding work on this delivery received from the transport layer
70      * that has not yet been processed by the application.
71      */
72     private int _flags = cast(byte) 0;
73 
74     private TransportDelivery _transportDelivery;
75     private bool _complete;
76     private bool _updated;
77     private bool _done;
78     private bool _aborted;
79 
80     private CompositeReadableBuffer _dataBuffer;
81     private ReadableBuffer _dataView;
82 
83     this(byte[] tag, LinkImpl link, DeliveryImpl previous)
84     {
85         _tag = tag;
86         _link = link;
87         _link.incrementUnsettled();
88         _linkPrevious = previous;
89         _messageFormat = DEFAULT_MESSAGE_FORMAT;
90         if (previous !is null)
91         {
92             previous._linkNext = this;
93         }
94     }
95 
96     
97     public byte[] getTag()
98     {
99         return _tag;
100     }
101 
102     
103     public LinkImpl getLink()
104     {
105         return _link;
106     }
107 
108     
109     public DeliveryState getLocalState()
110     {
111         return _deliveryState;
112     }
113 
114     
115     public DeliveryState getRemoteState()
116     {
117         return _remoteDeliveryState;
118     }
119 
120     
121     public bool remotelySettled()
122     {
123         return _remoteSettled;
124     }
125 
126     
127     public void setMessageFormat(int messageFormat)
128     {
129         _messageFormat = messageFormat;
130     }
131 
132     
133     public int getMessageFormat()
134     {
135         return _messageFormat;
136     }
137 
138     
139     public void disposition(DeliveryState state)
140     {
141         _deliveryState = state;
142         if(!_remoteSettled && !_settled)
143         {
144             addToTransportWorkList();
145         }
146     }
147 
148     
149     public void settle()
150     {
151         if (_settled) {
152             return;
153         }
154 
155         _settled = true;
156         _link.decrementUnsettled();
157         if(!_remoteSettled)
158         {
159             addToTransportWorkList();
160         }
161         else
162         {
163             _transportDelivery.settled();
164         }
165 
166         if(_link.current() is this)
167         {
168             _link.advance();
169         }
170 
171         _link.remove(this);
172         if(_linkPrevious !is null)
173         {
174             _linkPrevious._linkNext = _linkNext;
175         }
176 
177         if(_linkNext !is null)
178         {
179             _linkNext._linkPrevious = _linkPrevious;
180         }
181 
182         updateWork();
183 
184         _linkNext= null;
185         _linkPrevious = null;
186     }
187 
188     DeliveryImpl getLinkNext()
189     {
190         return _linkNext;
191     }
192 
193     
194     public DeliveryImpl next()
195     {
196         return getLinkNext();
197     }
198 
199     
200     public void free()
201     {
202         settle();
203     }
204 
205     DeliveryImpl getLinkPrevious()
206     {
207         return _linkPrevious;
208     }
209 
210     
211     public DeliveryImpl getWorkNext()
212     {
213         if (_workNext !is null)
214             return _workNext;
215         // the following hack is brought to you by the C implementation!
216         if (!_work)  // not on the work list
217             return (_link.getConnectionImpl()).getWorkHead();
218         return null;
219     }
220 
221     DeliveryImpl getWorkPrev()
222     {
223         return _workPrev;
224     }
225 
226     void setWorkNext(DeliveryImpl workNext)
227     {
228         _workNext = workNext;
229     }
230 
231     void setWorkPrev(DeliveryImpl workPrev)
232     {
233         _workPrev = workPrev;
234     }
235 
236     int recv(byte[] bytes, int offset, int size)
237     {
238         int consumed;
239         if (_dataBuffer !is null && _dataBuffer.hasRemaining())
240         {
241             consumed = min(size, _dataBuffer.remaining());
242 
243             _dataBuffer.get(bytes, offset, consumed);
244             _dataBuffer.reclaimRead();
245         }
246         else
247         {
248             consumed = 0;
249         }
250 
251         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;  //TODO - Implement
252     }
253 
254     int recv(WritableBuffer buffer)
255     {
256         int consumed;
257         if (_dataBuffer !is null && _dataBuffer.hasRemaining())
258         {
259             consumed = min(buffer.remaining(), _dataBuffer.remaining());
260             buffer.put(_dataBuffer);
261             _dataBuffer.reclaimRead();
262         }
263         else
264         {
265             consumed = 0;
266         }
267 
268         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;
269     }
270 
271     ReadableBuffer recv()
272     {
273         ReadableBuffer result = _dataView;
274         if (_dataView !is null)
275         {
276             _dataView = _dataBuffer = null;
277         }
278         else
279         {
280             result = EMPTY_BUFFER;
281         }
282 
283         return result;
284     }
285 
286     void updateWork()
287     {
288         getLink().getConnectionImpl().workUpdate(this);
289     }
290 
291     DeliveryImpl clearTransportWork()
292     {
293         DeliveryImpl next = _transportWorkNext;
294         getLink().getConnectionImpl().removeTransportWork(this);
295         return next;
296     }
297 
298     void addToTransportWorkList()
299     {
300         getLink().getConnectionImpl().addTransportWork(this);
301     }
302 
303     DeliveryImpl getTransportWorkNext()
304     {
305         return _transportWorkNext;
306     }
307 
308     DeliveryImpl getTransportWorkPrev()
309     {
310         return _transportWorkPrev;
311     }
312 
313     void setTransportWorkNext(DeliveryImpl transportWorkNext)
314     {
315         _transportWorkNext = transportWorkNext;
316     }
317 
318     void setTransportWorkPrev(DeliveryImpl transportWorkPrev)
319     {
320         _transportWorkPrev = transportWorkPrev;
321     }
322 
323     TransportDelivery getTransportDelivery()
324     {
325         return _transportDelivery;
326     }
327 
328     void setTransportDelivery(TransportDelivery transportDelivery)
329     {
330         _transportDelivery = transportDelivery;
331     }
332 
333     
334     public bool isSettled()
335     {
336         return _settled;
337     }
338 
339     int send(byte[] bytes, int offset, int length)
340     {
341         byte[] copy = new byte[length];
342         //System.arraycopy(bytes, offset, copy, 0, length);
343         copy[0 .. length] = bytes[offset .. offset+length];
344         getOrCreateDataBuffer().append(copy);
345         addToTransportWorkList();
346         return length;
347     }
348 
349     int send(ReadableBuffer buffer)
350     {
351         int length = buffer.remaining();
352         getOrCreateDataBuffer().append(copyContents(buffer));
353         addToTransportWorkList();
354         return length;
355     }
356 
357     int sendNoCopy(ReadableBuffer buffer)
358     {
359         int length = buffer.remaining();
360 
361         if (_dataView is null || !_dataView.hasRemaining())
362         {
363             _dataView = buffer;
364         }
365         else
366         {
367             consolidateSendBuffers(buffer);
368         }
369 
370         addToTransportWorkList();
371         return length;
372     }
373 
374     private byte[] copyContents(ReadableBuffer buffer)
375     {
376         byte[] copy = new byte[buffer.remaining()];
377 
378         if (buffer.hasArray())
379         {
380            // System.arraycopy(buffer.array(), buffer.arrayOffset() + buffer.position(), copy, 0, buffer.remaining());
381             copy[0 .. buffer.remaining()] = buffer.array()[buffer.arrayOffset() + buffer.position() ..  buffer.arrayOffset() + buffer.position()+buffer.remaining()];
382             buffer.position(buffer.limit());
383         }
384         else
385         {
386             buffer.get(copy, 0, buffer.remaining());
387         }
388 
389         return copy;
390     }
391 
392     private void consolidateSendBuffers(ReadableBuffer buffer)
393     {
394         if (_dataView == _dataBuffer)
395         {
396             getOrCreateDataBuffer().append(copyContents(buffer));
397         }
398         else
399         {
400             ReadableBuffer oldView = _dataView;
401 
402             CompositeReadableBuffer dataBuffer = getOrCreateDataBuffer();
403             dataBuffer.append(copyContents(oldView));
404             dataBuffer.append(copyContents(buffer));
405 
406             oldView.reclaimRead();
407         }
408 
409         buffer.reclaimRead();  // A pooled buffer could release now.
410     }
411 
412     void append(Binary payload)
413     {
414         byte[] data = payload.getArray();
415 
416         // The Composite buffer cannot handle composites where the array
417         // is a view of a larger array so we must copy the payload into
418         // an array of the exact size
419         if (payload.getArrayOffset() > 0 || payload.getLength() < data.length)
420         {
421             data = new byte[payload.getLength()];
422            // System.arraycopy(payload.getArray(), payload.getArrayOffset(), data, 0, payload.getLength());
423             data[0 .. payload.getLength()] = payload.getArray()[payload.getArrayOffset() .. payload.getArrayOffset()+payload.getLength()];
424         }
425 
426         getOrCreateDataBuffer().append(data);
427     }
428 
429     private CompositeReadableBuffer getOrCreateDataBuffer()
430     {
431         if (_dataBuffer is null)
432         {
433             _dataView = _dataBuffer = new CompositeReadableBuffer();
434         }
435 
436         return _dataBuffer;
437     }
438 
439     void append(byte[] data)
440     {
441         getOrCreateDataBuffer().append(data);
442     }
443 
444     void afterSend()
445     {
446         if (_dataView !is null)
447         {
448             _dataView.reclaimRead();
449             if (!_dataView.hasRemaining())
450             {
451                 _dataView = _dataBuffer;
452             }
453         }
454     }
455 
456     ReadableBuffer getData()
457     {
458         return _dataView is null ? EMPTY_BUFFER : _dataView;
459     }
460 
461     int getDataLength()
462     {
463         return _dataView is null ? 0 : _dataView.remaining();
464     }
465 
466     
467     public int available()
468     {
469         return _dataView is null ? 0 : _dataView.remaining();
470     }
471 
472     
473     public bool isWritable()
474     {
475         return  cast(SenderImpl)getLink() !is null
476                 && getLink().current() is this
477                 && (cast(SenderImpl) getLink()).hasCredit();
478     }
479 
480     
481     public bool isReadable()
482     {
483         return   cast(ReceiverImpl)getLink() !is null
484             && getLink().current() is this;
485     }
486 
487     void setComplete()
488     {
489         _complete = true;
490     }
491 
492     void setAborted()
493     {
494         _aborted = true;
495     }
496 
497     
498     public bool isAborted()
499     {
500         return _aborted;
501     }
502 
503     
504     public bool isPartial()
505     {
506         return !_complete;
507     }
508 
509     void setRemoteDeliveryState(DeliveryState remoteDeliveryState)
510     {
511         _remoteDeliveryState = remoteDeliveryState;
512         _updated = true;
513     }
514 
515     
516     public bool isUpdated()
517     {
518         return _updated;
519     }
520 
521     
522     public void clear()
523     {
524         _updated = false;
525         getLink().getConnectionImpl().workUpdate(this);
526     }
527 
528     void setDone()
529     {
530         _done = true;
531     }
532 
533     bool isDone()
534     {
535         return _done;
536     }
537 
538     void setRemoteSettled(bool remoteSettled)
539     {
540         _remoteSettled = remoteSettled;
541         _updated = true;
542     }
543 
544     
545     public bool isBuffered()
546     {
547         if (_remoteSettled) return false;
548         if ( cast(SenderImpl)getLink() !is null) {
549             if (isDone()) {
550                 return false;
551             } else {
552                 bool hasRemaining = false;
553                 if (_dataView !is null) {
554                     hasRemaining = _dataView.hasRemaining();
555                 }
556 
557                 return _complete || hasRemaining;
558             }
559         } else {
560             return false;
561         }
562     }
563 
564     
565     public Object getContext()
566     {
567         return _context;
568     }
569 
570     
571     public void setContext(Object context)
572     {
573         _context = context;
574     }
575 
576     
577     public Record attachments()
578     {
579         if(_attachments is null)
580         {
581             _attachments = new RecordImpl();
582         }
583 
584         return _attachments;
585     }
586 
587     //
588     //public String toString()
589     //{
590     //    StringBuilder builder = new StringBuilder();
591     //    builder.append("DeliveryImpl [_tag=").append(Arrays.toString(_tag))
592     //        .append(", _link=").append(_link)
593     //        .append(", _deliveryState=").append(_deliveryState)
594     //        .append(", _settled=").append(_settled)
595     //        .append(", _remoteSettled=").append(_remoteSettled)
596     //        .append(", _remoteDeliveryState=").append(_remoteDeliveryState)
597     //        .append(", _flags=").append(_flags)
598     //        .append(", _defaultDeliveryState=").append(_defaultDeliveryState)
599     //        .append(", _transportDelivery=").append(_transportDelivery)
600     //        .append(", _data Size=").append(getDataLength())
601     //        .append(", _complete=").append(_complete)
602     //        .append(", _updated=").append(_updated)
603     //        .append(", _done=").append(_done)
604     //        .append("]");
605     //    return builder.toString();
606     //}
607 
608     
609     public int pending()
610     {
611         return _dataView is null ? 0 : _dataView.remaining();
612     }
613 
614     
615     public void setDefaultDeliveryState(DeliveryState state)
616     {
617         _defaultDeliveryState = state;
618     }
619 
620     
621     public DeliveryState getDefaultDeliveryState()
622     {
623         return _defaultDeliveryState;
624     }
625 }