U
    
W[°  ã                   @   sL   d Z ddlmZ ddlmZ eejejƒG dd„ dƒƒZG dd„ deƒZdS )	z
Producer-Consumer Proxy.
é    )Úimplementer)Ú
interfacesc                   @   st   e Zd ZdZdZdZdZdZdZdZ	dZ
dd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ ZdS )ÚBasicProducerConsumerProxyaa  
    I can act as a man in the middle between any Producer and Consumer.

    @ivar producer: the Producer I subscribe to.
    @type producer: L{IProducer<interfaces.IProducer>}
    @ivar consumer: the Consumer I publish to.
    @type consumer: L{IConsumer<interfaces.IConsumer>}
    @ivar paused: As a Producer, am I paused?
    @type paused: bool
    NTFc                 C   s&   g | _ |d k	r"|| _| | | j¡ d S ©N)Ú_bufferÚconsumerÚregisterProducerÚiAmStreaming)Úselfr   © r   ú7/usr/lib/python3/dist-packages/twisted/protocols/pcp.pyÚ__init__"   s    z#BasicProducerConsumerProxy.__init__c                 C   s   d| _ | jr| j ¡  d S ©NT)ÚpausedÚproducerÚpauseProducing©r
   r   r   r   r   *   s    z)BasicProducerConsumerProxy.pauseProducingc                 C   sT   d| _ | jr0| j d | j¡¡ g | jd d …< n| js<d| _| jd k	rP| j ¡  d S )NFÚ T)	r   r   r   ÚwriteÚjoinr	   ÚoutstandingPullr   ÚresumeProducingr   r   r   r   r   /   s    
z*BasicProducerConsumerProxy.resumeProducingc                 C   s&   | j d k	r| j  ¡  | jd k	r"| `d S r   )r   ÚstopProducingr   r   r   r   r   r   <   s    


z(BasicProducerConsumerProxy.stopProducingc                 C   s@   | j s| js | js | j |¡ n| jd k	r<| j |¡ d| _d S ©NF)r   r	   r   r   Úappendr   r   ©r
   Údatar   r   r   r   D   s
    
z BasicProducerConsumerProxy.writec                 C   s    | j d k	r| j  ¡  |  ¡  d S r   )r   ÚfinishÚunregisterProducerr   r   r   r   r   M   s    

z!BasicProducerConsumerProxy.finishc                 C   s   || _ || _d S r   )r   ÚproducerIsStreaming©r
   r   Z	streamingr   r   r   r   R   s    z+BasicProducerConsumerProxy.registerProducerc                 C   s&   | j d k	r| ` | `| jr"| j ¡  d S r   )r   r   r   r   r   r   r   r   r   V   s
    
z-BasicProducerConsumerProxy.unregisterProducerc                 C   s   d| j t| ƒ| jf S )Nz<%s@%x around %s>)Ú	__class__Úidr   r   r   r   r   Ú__repr__]   s    z#BasicProducerConsumerProxy.__repr__)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r	   r   r   Zstoppedr   r   r   r   r   r   r   r   r#   r   r   r   r   r      s"   
	r   c                   @   sL   e Zd ZdZdZdZdZdd„ Zdd„ Zdd	„ Z	d
d„ Z
dd„ Zdd„ ZdS )ÚProducerConsumerProxyz˜ProducerConsumerProxy with a finite buffer.

    When my buffer fills up, I have my parent Producer pause until my buffer
    has room in it again.
    i   Fc                 C   s
   d| _ d S r   )r   r   r   r   r   r   m   s    z$ProducerConsumerProxy.pauseProducingc                 C   sô   d| _ | jrjd | j¡}|  |¡}|t|ƒk rZ||d … }| jrHtdƒ‚|g| jd d …< qng | jd d …< nd}| jr’|r’| js’| jd k	r’| j 	¡  | js | | _
| jd k	rðtdd„ | jD ƒƒ}| jrà|| jk ràd| _| j ¡  n| j
rð| j ¡  d S )NFr   ú.Streaming producer did not write all its data.r   c                 S   s   g | ]}t |ƒ‘qS r   ©Úlen©Ú.0Úsr   r   r   Ú
<listcomp>‰   s     z9ProducerConsumerProxy.resumeProducing.<locals>.<listcomp>)r   r   r   Ú_writeSomeDatar+   r	   ÚAssertionErrorÚunregisteredr   r   r   r   ÚsumÚproducerPausedÚ
bufferSizer   )r
   r   Ú	bytesSentZunsentÚbytesBufferedr   r   r   r   r   s2    
ÿÿ

z%ProducerConsumerProxy.resumeProducingc                 C   s¸   | j s| js | js | j |¡ nV| jd k	rv| jr8tdƒ‚|  |¡}d| _|t|ƒksv| jrbtdƒ‚| j ||d … ¡ | j	d k	r´| j
r´tdd„ | jD ƒƒ}|| jkr´| j	 ¡  d| _d S )Nz9Writing fresh data to consumer before my buffer is empty!Fr)   c                 S   s   g | ]}t |ƒ‘qS r   r*   r,   r   r   r   r/   ¯   s     z/ProducerConsumerProxy.write.<locals>.<listcomp>T)r   r	   r   r   r   r   r1   r0   r+   r   r   r3   r5   r   r4   )r
   r   r6   r7   r   r   r   r   š   s$    
ÿ
ÿ

zProducerConsumerProxy.writec                 C   s$   d| _ t | ||¡ |s | ¡  d S r   )r2   r   r   r   r    r   r   r   r   µ   s    z&ProducerConsumerProxy.registerProducerc                 C   s2   | j d k	r| ` | `d| _| jr.| js.| j ¡  d S r   )r   r   r2   r   r   r   r   r   r   r   r   »   s    
z(ProducerConsumerProxy.unregisterProducerc                 C   s"   | j dkrdS | j  |¡ t|ƒS )z`Write as much of this data as possible.

        @returns: The number of bytes written.
        Nr   )r   r   r+   r   r   r   r   r0   Ã   s    
z$ProducerConsumerProxy._writeSomeDataN)r$   r%   r&   r'   r5   r4   r2   r   r   r   r   r   r0   r   r   r   r   r(   a   s   (r(   N)	r'   Zzope.interfacer   Ztwisted.internetr   Z	IProducerZ	IConsumerr   r(   r   r   r   r   Ú<module>   s
   R