B
    0`"                 @   sL   d Z ddlmZ ddlmZ eejejG dd dZG dd deZdS )	z
Producer-Consumer Proxy.
    )implementer)
interfacesc               @   sz   e Zd ZdZdZdZdZdZdZdZ	dZ
dd Zdd Zd	d
 Zdd Zdd Zdd Zdd Zdd ZedddZdS )BasicProducerConsumerProxyaa  
    I can act as a man in the middle between any Producer and Consumer.

    @ivar producer: the Producer I subscribe to.
    @type producer: L{IProducer<interfaces.IProducer>}
    @ivar consumer: the Consumer I publish to.
    @type consumer: L{IConsumer<interfaces.IConsumer>}
    @ivar paused: As a Producer, am I paused?
    @type paused: bool
    NTFc             C   s&   g | _ |d k	r"|| _|| | j d S )N)_bufferconsumerregisterProduceriAmStreaming)selfr    r
   D/home/dcms/DCMS/lib/python3.7/site-packages/twisted/protocols/pcp.py__init__#   s    z#BasicProducerConsumerProxy.__init__c             C   s   d| _ | jr| j  d S )NT)pausedproducerpauseProducing)r	   r
   r
   r   r   +   s    z)BasicProducerConsumerProxy.pauseProducingc             C   sT   d| _ | jr0| jd| j g | jd d < n| js<d| _| jd k	rP| j  d S )NF T)	r   r   r   writejoinr   outstandingPullr   resumeProducing)r	   r
   r
   r   r   0   s    
z*BasicProducerConsumerProxy.resumeProducingc             C   s&   | j d k	r| j   | jd k	r"| `d S )N)r   stopProducingr   )r	   r
   r
   r   r   =   s    


z(BasicProducerConsumerProxy.stopProducingc             C   s@   | j s| js | js | j| n| jd k	r<| j| d| _d S )NF)r   r   r   r   appendr   r   )r	   datar
   r
   r   r   E   s
    
z BasicProducerConsumerProxy.writec             C   s    | j d k	r| j   |   d S )N)r   finishunregisterProducer)r	   r
   r
   r   r   N   s    

z!BasicProducerConsumerProxy.finishc             C   s   || _ || _d S )N)r   producerIsStreaming)r	   r   	streamingr
   r
   r   r   S   s    z+BasicProducerConsumerProxy.registerProducerc             C   s&   | j d k	r| ` | `| jr"| j  d S )N)r   r   r   r   )r	   r
   r
   r   r   W   s
    
z-BasicProducerConsumerProxy.unregisterProducer)returnc             C   s   d | jt| | jS )Nz<{}@{:x} around {}>)format	__class__idr   )r	   r
   r
   r   __repr__^   s    z#BasicProducerConsumerProxy.__repr__)__name__
__module____qualname____doc__r   r   r   r   r   r   stoppedr   r   r   r   r   r   r   r   strr    r
   r
   r
   r   r      s"   	r   c               @   sL   e Zd ZdZdZdZdZdd Zdd Zdd	 Z	d
d Z
dd Zdd ZdS )ProducerConsumerProxyzProducerConsumerProxy with a finite buffer.

    When my buffer fills up, I have my parent Producer pause until my buffer
    has room in it again.
    i   Fc             C   s
   d| _ d S )NT)r   )r	   r
   r
   r   r   o   s    z$ProducerConsumerProxy.pauseProducingc             C   s   d| _ | jrjd| j}| |}|t|k rZ||d  }| jrHtd|g| jd d < qng | jd d < nd}| jr|r| js| jd k	r| j	  | js| | _
| jd k	rtdd | jD }| jr|| jk rd| _| j  n| j
r| j  d S )NFr   z.Streaming producer did not write all its data.r   c             S   s   g | ]}t |qS r
   )len).0sr
   r
   r   
<listcomp>   s    z9ProducerConsumerProxy.resumeProducing.<locals>.<listcomp>)r   r   r   _writeSomeDatar(   r   AssertionErrorunregisteredr   r   r   r   sumproducerPaused
bufferSizer   )r	   r   	bytesSentZunsentbytesBufferedr
   r
   r   r   t   s2    



z%ProducerConsumerProxy.resumeProducingc             C   s   | j s| js | js | j| nV| jd k	rv| jr8td| |}d| _|t|ksv| jrbtd| j||d   | j	d k	r| j
rtdd | jD }|| jkr| j	  d| _d S )Nz9Writing fresh data to consumer before my buffer is empty!Fz.Streaming producer did not write all its data.c             S   s   g | ]}t |qS r
   )r(   )r)   r*   r
   r
   r   r+      s    z/ProducerConsumerProxy.write.<locals>.<listcomp>T)r   r   r   r   r   r   r-   r,   r(   r   r   r/   r1   r   r0   )r	   r   r2   r3   r
   r
   r   r      s     



zProducerConsumerProxy.writec             C   s$   d| _ t| || |s |  d S )NF)r.   r   r   r   )r	   r   r   r
   r
   r   r      s    z&ProducerConsumerProxy.registerProducerc             C   s2   | j d k	r| ` | `d| _| jr.| js.| j  d S )NT)r   r   r.   r   r   r   )r	   r
   r
   r   r      s    
z(ProducerConsumerProxy.unregisterProducerc             C   s"   | j dkrdS | j | t|S )z`Write as much of this data as possible.

        @returns: The number of bytes written.
        Nr   )r   r   r(   )r	   r   r
   r
   r   r,      s    
z$ProducerConsumerProxy._writeSomeDataN)r!   r"   r#   r$   r1   r0   r.   r   r   r   r   r   r,   r
   r
   r
   r   r'   b   s   -r'   N)	r$   Zzope.interfacer   Ztwisted.internetr   Z	IProducerZ	IConsumerr   r'   r
   r
   r
   r   <module>   s   T