U
    
W[4                     @   s   d Z ddlmZmZ ddlZddlZddlZddlZddlZddl	m
Z
 ddlmZ ddlmZ ddlmZmZmZmZ ddlmZ dd	lmZ G d
d dejZG dd dejZG dd deZdS )zD
Test running processes with the APIs in L{twisted.internet.utils}.
    )divisionabsolute_importN)_PY3)platform)unittest)errorreactorutils
interfaces)Deferred)SuppressedWarningsTestsc                   @   s   e Zd ZdZeeddkr dZdZdZ	e
jZdd Zdd Zdd	 Zd
d Zdd Zdd Zdd Ze rtde_dd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" ZdS )#ProcessUtilsTestszt
    Test running a process using L{getProcessOutput}, L{getProcessValue}, and
    L{getProcessOutputAndValue}.
    Nz)reactor doesn't implement IReactorProcessc              	   C   sB   |   }t|d}|tj|tj  W 5 Q R X tj|S )zj
        Write the given list of lines to a text file and return the absolute
        path to it.
        Zwt)mktempopenwriteoslinesepjoinpathabspath)selfZsourceLinesZscript
scriptFile r   :/usr/lib/python3/dist-packages/twisted/test/test_iutils.pymakeSourceFile!   s    "z ProcessUtilsTests.makeSourceFilec                 C   s>   |  ddddddddd	d
g
}t| jd|g}|| jdS )z
        L{getProcessOutput} returns a L{Deferred} which fires with the complete
        output of the process it runs after that process exits.
        
import syszfor s in b'hello world\n':z%    if hasattr(sys.stdout, 'buffer'):z        # Python 3z        s = bytes([s])z"        sys.stdout.buffer.write(s)z	    else:z        # Python 2z        sys.stdout.write(s)z    sys.stdout.flush()-us   hello world
r   r	   getProcessOutputexeaddCallbackassertEqualr   r   dr   r   r   test_output,   s    zProcessUtilsTests.test_outputc                    sF     ddg}t jd|g} |t} fdd}|| |S )z
        The L{Deferred} returned by L{getProcessOutput} is fired with an
        L{IOError} L{Failure} if the child process writes to stderr.
        r   z!sys.stderr.write("hello world\n")r   c                    s     | jtjS N)assertFailureZprocessEndedr   ZProcessDone)errr   r   r   cbFailedM   s    z?ProcessUtilsTests.test_outputWithErrorIgnored.<locals>.cbFailed)r   r	   r   r   r&   IOErrorr    )r   r   r#   r)   r   r(   r   test_outputWithErrorIgnored@   s    
z-ProcessUtilsTests.test_outputWithErrorIgnoredc                 C   s8   |  dddddg}tj| jd|gdd}|| jd	S )
z
        If a C{True} value is supplied for the C{errortoo} parameter to
        L{getProcessOutput}, the returned L{Deferred} fires with the child's
        stderr output as well as its stdout output.
        r   zsys.stdout.write("foo")sys.stdout.flush()zsys.stderr.write("foo")sys.stderr.flush()r   T)Zerrortoos   foofoor   r"   r   r   r   test_outputWithErrorCollectedS   s    	z/ProcessUtilsTests.test_outputWithErrorCollectedc                 C   s,   |  dg}t| jd|g}|| jdS )z|
        The L{Deferred} returned by L{getProcessValue} is fired with the exit
        status of the child process.
        zraise SystemExit(1)r      )r   r	   getProcessValuer   r    r!   r"   r   r   r   
test_valuef   s    zProcessUtilsTests.test_valuec                    sF     ddddddddd	d
g
} fdd}t jd|g}||S )a  
        The L{Deferred} returned by L{getProcessOutputAndValue} fires with a
        three-tuple, the elements of which give the data written to the child's
        stdout, the data written to the child's stderr, and the exit status of
        the child.
        r   z!if hasattr(sys.stdout, 'buffer'):z    # Python 3z.    sys.stdout.buffer.write(b'hello world!\n')z0    sys.stderr.buffer.write(b'goodbye world!\n')zelse:z    # Python 2z'    sys.stdout.write(b'hello world!\n')z)    sys.stderr.write(b'goodbye world!\n')zsys.exit(1)c                    sJ   | \}}}  |d tr(  |d n  |dtj    |d d S )Ns   hello world!
s   goodbye world!
s   goodbye world!r/   )r!   r   r   r   )Zout_err_codeoutr'   coder(   r   r   gotOutputAndValue   s    
z@ProcessUtilsTests.test_outputAndValue.<locals>.gotOutputAndValuer   )r   r	   getProcessOutputAndValuer   r    r   r   r4   r#   r   r(   r   test_outputAndValueq   s    	z%ProcessUtilsTests.test_outputAndValuec                    sJ     ddddddg} fdd}t jd	|g} |t}||S )
z
        If the child process exits because of a signal, the L{Deferred}
        returned by L{getProcessOutputAndValue} fires a L{Failure} of a tuple
        containing the child's stdout, stderr, and the signal which caused
        it to exit.
        zimport sys, os, signalz"sys.stdout.write('stdout bytes\n')z"sys.stderr.write('stderr bytes\n')r,   r-   z$os.kill(os.getpid(), signal.SIGKILL)c                    s4   | \}}}  |d   |d   |tj d S )Ns   stdout bytes
s   stderr bytes
)r!   signalSIGKILL)Zout_err_sigr2   r'   Zsigr(   r   r   r4      s    
z>ProcessUtilsTests.test_outputSignal.<locals>.gotOutputAndValuer   )r   r	   r5   r   r&   tupler    r6   r   r(   r   test_outputSignal   s    
z#ProcessUtilsTests.test_outputSignalz"Windows doesn't have real signals.c                 C   sV   t j|  }t | | ddg}|| jd|g|d}|||t	
  |S )Nzimport os, syssys.stdout.write(os.getcwd())r   )r   )r   r   r   r   makedirsr   r   r    encodesysgetfilesystemencodingr   ZutilFunccheckdirr   r#   r   r   r   	_pathTest   s    
zProcessUtilsTests._pathTestc                 C   s   |  tj| jS )z
        L{getProcessOutput} runs the given command with the working directory
        given by the C{path} parameter.
        )rD   r	   r   r!   r(   r   r   r   test_getProcessOutputPath   s    z+ProcessUtilsTests.test_getProcessOutputPathc                    s    fdd}  tj|S )z~
        L{getProcessValue} runs the given command with the working directory
        given by the C{path} parameter.
        c                    s     | d d S Nr   r!   resultZignoredr(   r   r   rB      s    z9ProcessUtilsTests.test_getProcessValuePath.<locals>.check)rD   r	   r0   r   rB   r   r(   r   test_getProcessValuePath   s    z*ProcessUtilsTests.test_getProcessValuePathc                    s    fdd}  tj|S )z
        L{getProcessOutputAndValue} runs the given command with the working
        directory given by the C{path} parameter.
        c                    s&   | \}}}  ||   |d d S rF   rG   Zout_err_statusrC   r2   r'   Zstatusr(   r   r   rB      s    
zBProcessUtilsTests.test_getProcessOutputAndValuePath.<locals>.check)rD   r	   r5   rJ   r   r(   r   !test_getProcessOutputAndValuePath   s    z3ProcessUtilsTests.test_getProcessOutputAndValuePathc              	   C   s   t j|  }t | | dd|f dg}| t jt   t | | t j	|t
t 
dj t 	|d || jdd|g}|||t  |S )Nzimport os, sys, statzos.chmod(%r, stat.S_IXUSR)r<   .r   z-Sr   )r   r   r   r   r=   r   Z
addCleanupchdirgetcwdchmodstatS_IMODEst_moder   r    r>   r?   r@   rA   r   r   r   _defaultPathTest   s$    
	
  z"ProcessUtilsTests._defaultPathTestc                 C   s   |  tj| jS )a  
        If no value is supplied for the C{path} parameter, L{getProcessOutput}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        )rU   r	   r   r!   r(   r   r   r    test_getProcessOutputDefaultPath   s    z2ProcessUtilsTests.test_getProcessOutputDefaultPathc                    s    fdd}  tj|S )a   
        If no value is supplied for the C{path} parameter, L{getProcessValue}
        runs the given command in the same working directory as the parent
        process and succeeds even if the current working directory is not
        accessible.
        c                    s     | d d S rF   rG   rH   r(   r   r   rB     s    z@ProcessUtilsTests.test_getProcessValueDefaultPath.<locals>.check)rU   r	   r0   rJ   r   r(   r   test_getProcessValueDefaultPath  s    z1ProcessUtilsTests.test_getProcessValueDefaultPathc                    s    fdd}  tj|S )a	  
        If no value is supplied for the C{path} parameter,
        L{getProcessOutputAndValue} runs the given command in the same working
        directory as the parent process and succeeds even if the current
        working directory is not accessible.
        c                    s&   | \}}}  ||   |d d S rF   rG   rL   r(   r   r   rB     s    
zIProcessUtilsTests.test_getProcessOutputAndValueDefaultPath.<locals>.check)rU   r	   r5   rJ   r   r(   r   (test_getProcessOutputAndValueDefaultPath  s
     z:ProcessUtilsTests.test_getProcessOutputAndValueDefaultPath)__name__
__module____qualname____doc__r
   ZIReactorProcessr   skipoutputvaluer?   
executabler   r   r$   r+   r.   r1   r7   r;   r   Z	isWindowsrD   rE   rK   rM   rU   rV   rW   rX   r   r   r   r   r      s.   !
 
r   c                   @   s   e Zd ZdZdd ZdS )SuppressWarningsTestsz.
    Tests for L{utils.suppressWarnings}.
    c                    s   g   fdd}|  td| dd }t|dtddf}|d	 | t d
 |d | t d
 |d | t d dS )zs
        L{utils.suppressWarnings} decorates a function so that the given
        warnings are suppressed.
        c                    s     ||f d S r%   )append)r   akwrI   r   r   showwarning*  s    z@SuppressWarningsTests.test_suppressWarnings.<locals>.showwarningrf   c                 S   s   t |  d S r%   )warningswarn)msgr   r   r   f.  s    z6SuppressWarningsTests.test_suppressWarnings.<locals>.f)ignorezThis is messagemessagezSanity check messager/   zUnignored message   N)Zpatchrg   r	   ZsuppressWarningsdictr!   len)r   rf   rj   gr   re   r   test_suppressWarnings$  s    z+SuppressWarningsTests.test_suppressWarningsN)rY   rZ   r[   r\   rr   r   r   r   r   ra      s   ra   c                   @   s*   e Zd ZdZeejZdd Zdd ZdS )DeferredSuppressedWarningsTestsz`
    Tests for L{utils.runWithWarningsSuppressed}, the version that supports
    Deferreds.
    c                    sh   di fdi fg}t   | | fdd td  d td | dgdd	 |  D  d
S )z
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires.
        rk   z.*foo.*rk   z.*bar.*c                      s    S r%   r   r   re   r   r   <lambda>U      zGDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<lambda>
ignore foo   ignore foo 2c                 S   s   g | ]}|d  qS rl   r   .0wr   r   r   
<listcomp>Z  s     zIDeferredSuppressedWarningsTests.test_deferredCallback.<locals>.<listcomp>N)r   runWithWarningsSuppressedrg   rh   callbackr!   flushWarnings)r   filtersr   re   r   test_deferredCallbackL  s    


 z5DeferredSuppressedWarningsTests.test_deferredCallbackc                    sx   di fdi fg}t   | | fdd}td  t  |dd  td | dgdd	 |  D  d
S )z
        If the function called by L{utils.runWithWarningsSuppressed} returns a
        C{Deferred}, the warning filters aren't removed until the Deferred
        fires with an errback.
        rt   ru   c                      s    S r%   r   r   re   r   r   rv   f  rw   zFDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<lambda>rx   c                 S   s
   |  tS r%   )ZtrapZeroDivisionError)rj   r   r   r   rv   i  rw   rz   c                 S   s   g | ]}|d  qS rl   r   r{   r   r   r   r~   l  s     zHDeferredSuppressedWarningsTests.test_deferredErrback.<locals>.<listcomp>N)	r   r   rg   rh   Zerrbackr   Z
addErrbackr!   r   )r   r   r#   r   re   r   test_deferredErrback]  s    

 z4DeferredSuppressedWarningsTests.test_deferredErrbackN)	rY   rZ   r[   r\   staticmethodr	   r   r   r   r   r   r   r   rs   C  s   
rs   )r\   Z
__future__r   r   rg   r   rR   r?   r8   Ztwisted.python.compatr   Ztwisted.python.runtimer   Ztwisted.trialr   Ztwisted.internetr   r   r	   r
   Ztwisted.internet.deferr   Ztwisted.python.test.test_utilr   ZTestCaser   ZSynchronousTestCasera   rs   r   r   r   r   <module>   s   (  #