1 /* 2 * hunt-proton: AMQP Protocol library for D programming language. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net/ 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module hunt.proton.engine.impl.TransportSender; 13 14 import hunt.proton.amqp.UnsignedInteger; 15 import hunt.proton.amqp.transport.Flow; 16 import hunt.proton.engine.impl.TransportLink; 17 import hunt.proton.engine.impl.SenderImpl; 18 import hunt.proton.engine.impl.DeliveryImpl; 19 import std.concurrency : initOnce; 20 21 22 class TransportSender : TransportLink 23 { 24 private bool _drain; 25 private DeliveryImpl _inProgressDelivery; 26 //private static UnsignedInteger ORIGINAL_DELIVERY_COUNT = UnsignedInteger.ZERO; 27 28 static UnsignedInteger ORIGINAL_DELIVERY_COUNT() { 29 __gshared UnsignedInteger inst; 30 return initOnce!inst(UnsignedInteger.ZERO); 31 } 32 33 this(SenderImpl link) 34 { 35 super(link); 36 setDeliveryCount(ORIGINAL_DELIVERY_COUNT); 37 link.setTransportLink(this); 38 } 39 40 override 41 void handleFlow(Flow flow) 42 { 43 super.handleFlow(flow); 44 _drain = flow.getDrain().booleanValue; 45 (cast(SenderImpl)getLink()).setDrain(flow.getDrain().booleanValue); 46 int oldCredit = (cast(SenderImpl)getLink()).getCredit(); 47 UnsignedInteger oldLimit = getLinkCredit().add(getDeliveryCount()); 48 UnsignedInteger transferLimit = flow.getLinkCredit().add(flow.getDeliveryCount() is null 49 ? ORIGINAL_DELIVERY_COUNT 50 : flow.getDeliveryCount()); 51 UnsignedInteger linkCredit = transferLimit.subtract(getDeliveryCount()); 52 53 setLinkCredit(linkCredit); 54 (cast(SenderImpl)getLink()).setCredit(transferLimit.subtract(oldLimit).intValue() + oldCredit); 55 56 DeliveryImpl current = (cast(SenderImpl)getLink()).current(); 57 (cast(SenderImpl)getLink()).getConnectionImpl().workUpdate(current); 58 setLinkCredit(linkCredit); 59 } 60 61 public void setInProgressDelivery(DeliveryImpl inProgressDelivery) 62 { 63 _inProgressDelivery = inProgressDelivery; 64 } 65 66 public DeliveryImpl getInProgressDelivery() 67 { 68 return _inProgressDelivery; 69 } 70 }