U
    
W[d3                     @   s$  d Z ddlmZmZ ddlZddlZddlZddlmZ ddl	m
Z
mZ ddlmZ ddlmZ ddlmZmZmZ dd	lmZmZmZmZmZ dd
lmZ dZdZe reddkrdZeej Z!ej"#ej$e!d< n"e Z!ej"#ej$%e& e!d< G dd dej'Z(G dd dej)Z*dS )z
Tests for L{twisted.internet.stdio}.

@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX
    platforms and native L{str} keys/values on Windows.
    )absolute_importdivisionN)unittest)filepathlog)requireModule)platform)range
intToBytesbytesEnviron)errordeferprotocolstdioreactor)ConnectionLostNotifyingProtocols   xyz123abc Twisted is great!Zwin32processzIOn windows, spawnProcess is not available in the absence of win32process.
PYTHONPATHs
   PYTHONPATHc                   @   s4   e Zd ZdZdZdd Zdd Zdd Zd	d
 ZdS )StandardIOTestProcessProtocola  
    Test helper for collecting output from a child process and notifying
    something when it exits.

    @ivar onConnection: A L{defer.Deferred} which will be called back with
    L{None} when the connection to the child process is established.

    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
    failure associated with the child process exiting when it exits.

    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
    this instance whenever C{childDataReceived} is called, or L{None} to
    suppress these callbacks.

    @ivar data: A C{dict} mapping file descriptors to strings containing all
    bytes received from the child process on each file descriptor.
    Nc                 C   s   t  | _t  | _i | _d S N)r   DeferredonConnectiononCompletiondataself r   9/usr/lib/python3/dist-packages/twisted/test/test_stdio.py__init__B   s    

z&StandardIOTestProcessProtocol.__init__c                 C   s   | j d  d S r   )r   callbackr   r   r   r   connectionMadeH   s    z,StandardIOTestProcessProtocol.connectionMadec                 C   s>   | j |d| | j |< | jdk	r:| jd }| _||  dS )z
        Record all bytes received from the child process in the C{data}
        dictionary.  Fire C{onDataReceived} if it is not L{None}.
            N)r   getonDataReceivedr   )r   namebytesdr   r   r   childDataReceivedL   s    
z/StandardIOTestProcessProtocol.childDataReceivedc                 C   s   | j | d S r   )r   r   )r   reasonr   r   r   processEndedW   s    z*StandardIOTestProcessProtocol.processEnded)	__name__
__module____qualname____doc__r"   r   r   r&   r(   r   r   r   r   r   .   s   r   c                   @   s~   e Zd ZeZdd Zdd Zdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Ze rzde_dS )StandardInputOutputTestsc                 O   s:   t jdd| tjjgt| }tj|t j|fdti|S )a_  
        Launch a child Python process and communicate with it using the
        given ProcessProtocol.

        @param proto: A L{ProcessProtocol} instance which will be connected
        to the child process.

        @param sibling: The basename of a file containing the Python program
        to run in the child process.

        @param *args: strings which will be passed to the child process on
        the command line as C{argv[2:]}.

        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.

        @return: The L{IProcessTransport} provider for the spawned process.
        s   -ms   twisted.test.env)sys
executabler   	__class__r*   listZspawnProcess	properEnv)r   protoZsiblingargskwr   r   r   _spawnProcess`   s      z&StandardInputOutputTests._spawnProcessc                    s$   fdd} fdd}| ||S )Nc                    s     d| f  d S )Nz'Process terminated with non-Failure: %r)Zfail)resultr   r   r   cb~   s    z4StandardInputOutputTests._requireFailure.<locals>.cbc                    s    | S r   r   )err)r   r   r   eb   s    z4StandardInputOutputTests._requireFailure.<locals>.eb)ZaddCallbacks)r   r%   r   r9   r;   r   )r   r   r   _requireFailure}   s    z(StandardInputOutputTests._requireFailurec                    sL      td   t j}d   fdd}||S )z
        Verify that a protocol connected to L{StandardIO} can disconnect
        itself using C{transport.loseConnection}.
        Child process logging to s   stdio_test_loseconnc              	      sP   t  d"}|D ]}td|   qW 5 Q R X dj | tj d S )NrzChild logged:    )	openr   msgrstripZfailIfInr   trapr   ProcessDone)r'   flineerrorLogFilepr   r   r   r(      s
    zBStandardInputOutputTests.test_loseConnection.<locals>.processEnded)mktempr   rA   r   r   r7   r<   r   r%   r(   r   rG   r   test_loseConnection   s    z,StandardInputOutputTests.test_loseConnectionc                    sf   |   }td|  t  t  _ fdd} j| dd }|  j|}| 	 d| |S )z
        When stdin is closed and the protocol connected to it implements
        L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
        is called.
        r=   c                    s    j } j  |S r   )r   Z	transportZ
closeStdin)Zignoredr%   rI   r   r   cbBytes   s    
zAStandardInputOutputTests.test_readConnectionLost.<locals>.cbBytesc                 S   s   |  tj d S r   )rC   r   rD   r'   r   r   r   r(      s    zFStandardInputOutputTests.test_readConnectionLost.<locals>.processEndeds   stdio_test_halfclose)
rJ   r   rA   r   r   r   r"   addCallbackr<   r7   )r   rH   rN   r(   r%   r   rM   r   test_readConnectionLost   s    
  z0StandardInputOutputTests.test_readConnectionLostc              
      sj   t   zj dtdd W n0 tk
rL } ztt|W 5 d}~X Y nX  fdd} j|S )z
        Verify that a write made directly to stdout using L{os.write}
        after StandardIO has finished is reliably received by the
        process reading that stdout.
        s   stdio_test_lastwriteT)ZusePTYNc                    s0     jd td jf  | tj dS )z
            Asserts that the parent received the bytes written by the child
            immediately after the child starts.
            r?   z4Received %r from child, did not find expected bytes.N)
assertTruer   endswithUNIQUE_LAST_WRITE_STRINGrC   r   rD   rO   rI   r   r   r   r(      s    zEStandardInputOutputTests.test_lastWriteReceived.<locals>.processEnded)	r   r7   rT   
ValueErrorr   ZSkipTeststrr<   r   )r   er(   r   rU   r   test_lastWriteReceived   s      
 
z/StandardInputOutputTests.test_lastWriteReceivedc                    s2   t    j} d  fdd}||S )z
        Verify that the transport of a protocol connected to L{StandardIO}
        has C{getHost} and C{getPeer} methods.
        s   stdio_test_hostpeerc                    s6    j d  \}}| | | tj d S )Nr?   )r   
splitlinesrR   rC   r   rD   )r'   ZhostZpeerrU   r   r   r(      s    

z?StandardInputOutputTests.test_hostAndPeer.<locals>.processEndedr   r   r7   r<   rK   r   rU   r   test_hostAndPeer   s
    z)StandardInputOutputTests.test_hostAndPeerc                    s2   t    j} d  fdd}||S )z
        Verify that the C{write} method of the transport of a protocol
        connected to L{StandardIO} sends bytes to standard out.
        s   stdio_test_writec                    s"     jd d | tj d S Nr?   s   ok!assertEqualr   rC   r   rD   rO   rU   r   r   r(      s    z9StandardInputOutputTests.test_write.<locals>.processEndedr[   rK   r   rU   r   
test_write   s
    z#StandardInputOutputTests.test_writec                    s2   t    j} d  fdd}||S )z
        Verify that the C{writeSequence} method of the transport of a
        protocol connected to L{StandardIO} sends bytes to standard out.
        s   stdio_test_writeseqc                    s"     jd d | tj d S r]   r^   rO   rU   r   r   r(     s    zAStandardInputOutputTests.test_writeSequence.<locals>.processEndedr[   rK   r   rU   r   test_writeSequence   s
    z+StandardInputOutputTests.test_writeSequencec              	   C   sB   |   }t|d&}tdD ]}|t|d  qW 5 Q R X |S )Nwbi      
)rJ   r@   r	   writer
   )r   junkPathZjunkFileir   r   r   	_junkPath  s
    z"StandardInputOutputTests._junkPathc                    sd   t  j}g ttd fdd dj  fdd}||S )z
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IProducer} provider.
        d   c                    s:   r6 t d  d  td d  d S )Nrc   g{Gz?)appendr
   poprd   r   	callLater)Zign)r   proctoWritewrittenr   r   r      s    z>StandardInputOutputTests.test_producer.<locals>.connectionMades   stdio_test_producerc                    s>     jd d dtf  | tj d S )Nr?   r    z*Connection lost with %d writes left to go.)r_   r   joinZassertFalselenrC   r   rD   rO   )rI   r   rn   ro   r   r   r(   *  s    z<StandardInputOutputTests.test_producer.<locals>.processEnded)r   r   r2   r	   r7   r   rP   r<   rK   r   )r   rI   rm   r   rn   ro   r   test_producer  s    z&StandardInputOutputTests.test_producerc                    s>   t  j}  d   fdd}||S )z
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IConsumer} provider.
        s   stdio_test_consumerc              	      s<   t  d}jd |  W 5 Q R X | tj d S )Nrbr?   )r@   r_   r   readrC   r   rD   )r'   rE   re   rI   r   r   r   r(   ?  s     z<StandardInputOutputTests.test_consumer.<locals>.processEnded)r   r   rg   r7   r<   rK   r   ru   r   test_consumer3  s    z&StandardInputOutputTests.test_consumerc                    s   t  }t|}t d _}|j	 t
| d}t st \}}tj	| tj	| ||d< tj|f| dt  fddtd fdd	}|| |S )
aE  
        If L{StandardIO} is created with a file descriptor which refers to a
        normal file (ie, a file from the filesystem), L{StandardIO.write}
        writes bytes to that file.  In particular, it does not immediately
        consider the file closed or call its protocol's C{connectionLost}
        method.
        rb   )stdoutstdin   c                     sB   D ],} | kr     d S  t|   q2qtd d S )Nr   )ZloseConnectionrd   r
   r   rl   )value)
connectioncounthowManyspinr   r   r~   e  s    zAStandardInputOutputTests.test_normalFileStandardOut.<locals>.spinr   c              	      s8    t d    dttt d S )Nr?   r    )r_   nextZ
getContentrp   mapr
   r	   rO   )r|   r}   pathr   r   r   cbLostq  s
    zCStandardInputOutputTests.test_normalFileStandardOut.<locals>.cbLost)r   r   r   r   ZFilePathrJ   r@   normalZ
addCleanupclosedictfilenor   	isWindowsospiper   Z
StandardIO	itertoolsr|   r   rl   rP   )r   Z
onConnLostr4   r   kwargsr>   wr   r   )r{   r|   r}   r   r   r~   r   test_normalFileStandardOutF  s&    
z3StandardInputOutputTests.test_normalFileStandardOutzpStandardIO does not accept stdout as an argument to Windows.  Testing redirection to a file is therefore harder.N)r)   r*   r+   skipWindowsNopywin32skipr7   r<   rL   rQ   rY   r\   r`   ra   rg   rr   rv   r   r   r   r   r   r   r   r-   \   s    '3r-   )+r,   Z
__future__r   r   r   r/   r   Ztwisted.trialr   Ztwisted.pythonr   r   Ztwisted.python.reflectr   Ztwisted.python.runtimer   Ztwisted.python.compatr	   r
   r   Ztwisted.internetr   r   r   r   r   Ztwisted.test.test_tcpr   rT   r   r   r   environr3   pathseprp   r   encodegetfilesystemencodingZProcessProtocolr   ZTestCaser-   r   r   r   r   <module>   s0   
.