U
    
W[¨  ã                   @   s†   d Z ddlmZmZ ddlmZ zddlmZ W n ek
rH   dZY nX ddl	m
Z
 ddlmZ G dd	„ d	eƒZG d
d„ deƒZdS )z-
Tests for L{twisted.internet.epollreactor}.
é    )ÚdivisionÚabsolute_import)ÚTestCase)Ú_ContinuousPollingN)ÚClock©ÚConnectionDonec                   @   s8   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ ZdS )Ú
DescriptorzF
    Records reads and writes, as if it were a C{FileDescriptor}.
    c                 C   s
   g | _ d S ©N)Úevents©Úself© r   úI/usr/lib/python3/dist-packages/twisted/internet/test/test_epollreactor.pyÚ__init__   s    zDescriptor.__init__c                 C   s   dS )Né   r   r   r   r   r   Úfileno   s    zDescriptor.filenoc                 C   s   | j  d¡ d S )NÚread©r   Úappendr   r   r   r   ÚdoRead!   s    zDescriptor.doReadc                 C   s   | j  d¡ d S )NÚwriter   r   r   r   r   ÚdoWrite%   s    zDescriptor.doWritec                 C   s   |  t¡ | j d¡ d S )NÚlost)Ztrapr   r   r   )r   Úreasonr   r   r   ÚconnectionLost)   s    
zDescriptor.connectionLostN)	Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r   r   r   r   r   r   r	      s   r	   c                   @   s„   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zedkr€dZdS )ÚContinuousPollingTestsza
    L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
    objects.
    c                 C   sv   t tƒ ƒ}|  |j¡ tƒ }|  | |¡¡ | |¡ |  |j¡ |  	|jj
¡ |  |jj|j¡ |  	| |¡¡ dS )zi
        Adding a reader when there was previously no reader starts up a
        C{LoopingCall}.
        N)r   r   ÚassertIsNoneÚ_loopÚobjectÚassertFalseÚ	isReadingÚ	addReaderÚassertIsNotNoneÚ
assertTrueÚrunningÚassertIsÚclockÚ_reactor©r   ÚpollerÚreaderr   r   r   Útest_addReader5   s    

z%ContinuousPollingTests.test_addReaderc                 C   sv   t tƒ ƒ}|  |j¡ tƒ }|  | |¡¡ | |¡ |  |j¡ |  	|jj
¡ |  |jj|j¡ |  	| |¡¡ dS )zi
        Adding a writer when there was previously no writer starts up a
        C{LoopingCall}.
        N)r   r   r!   r"   r#   r$   Ú	isWritingÚ	addWriterr'   r(   r)   r*   r+   r,   ©r   r.   Úwriterr   r   r   Útest_addWriterE   s    

z%ContinuousPollingTests.test_addWriterc                 C   sV   t tƒ ƒ}tƒ }| |¡ | |¡ |  |j¡ |  |j 	¡ g ¡ |  
| |¡¡ dS )z=
        Removing a reader stops the C{LoopingCall}.
        N)r   r   r#   r&   ÚremoveReaderr!   r"   ÚassertEqualr,   ÚgetDelayedCallsr$   r%   r-   r   r   r   Útest_removeReaderU   s    


z(ContinuousPollingTests.test_removeReaderc                 C   sV   t tƒ ƒ}tƒ }| |¡ | |¡ |  |j¡ |  |j 	¡ g ¡ |  
| |¡¡ dS )z=
        Removing a writer stops the C{LoopingCall}.
        N)r   r   r#   r2   ÚremoveWriterr!   r"   r7   r,   r8   r$   r1   r3   r   r   r   Útest_removeWriterb   s    


z(ContinuousPollingTests.test_removeWriterc                 C   s&   t tƒ ƒ}| tƒ ¡ | tƒ ¡ dS )zM
        Removing unknown readers and writers silently does nothing.
        N)r   r   r:   r#   r6   )r   r.   r   r   r   Útest_removeUnknowno   s    
z)ContinuousPollingTests.test_removeUnknownc                 C   s    t tƒ ƒ}tƒ }| |¡ |  |j¡ | tƒ ¡ |  |j¡ | tƒ ¡ |  |j¡ | tƒ ¡ | |¡ |  |j¡ |  |jj	¡ |  
t|j ¡ ƒd¡ dS )za
        Adding multiple readers and writers results in a single
        C{LoopingCall}.
        r   N)r   r   r#   r2   r'   r"   r&   r:   r(   r)   r7   Úlenr,   r8   r3   r   r   r   Útest_multipleReadersAndWritersx   s    


z5ContinuousPollingTests.test_multipleReadersAndWritersc                 C   s„   t ƒ }t|ƒ}tƒ }| |¡ |  |jg ¡ | d¡ |  |jdg¡ | d¡ |  |jddg¡ | d¡ |  |jdddg¡ dS )za
        Adding a reader causes its C{doRead} to be called every 1
        milliseconds.
        gñhãˆµøä>r   N)r   r   r	   r&   r7   r   Úadvance©r   Zreactorr.   Zdescr   r   r   Útest_readerPollingŒ   s    



z)ContinuousPollingTests.test_readerPollingc                 C   s„   t ƒ }t|ƒ}tƒ }| |¡ |  |jg ¡ | d¡ |  |jdg¡ | d¡ |  |jddg¡ | d¡ |  |jdddg¡ dS )zb
        Adding a writer causes its C{doWrite} to be called every 1
        milliseconds.
        çü©ñÒMbP?r   N)r   r   r	   r2   r7   r   r?   r@   r   r   r   Útest_writerPollingž   s    



z)ContinuousPollingTests.test_writerPollingc                 C   sT   t ƒ }t|ƒ}tƒ }dd„ |_| |¡ |  |jg ¡ | d¡ |  |jdg¡ dS )zu
        If a C{doRead} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        c                   S   s   t ƒ S r
   r   r   r   r   r   Ú<lambda>¸   ó    zBContinuousPollingTests.test_connectionLostOnRead.<locals>.<lambda>rB   r   N)r   r   r	   r   r&   r7   r   r?   r@   r   r   r   Útest_connectionLostOnRead°   s    


z0ContinuousPollingTests.test_connectionLostOnReadc                 C   sT   t ƒ }t|ƒ}tƒ }dd„ |_| |¡ |  |jg ¡ | d¡ |  |jdg¡ dS )zv
        If a C{doWrite} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        c                   S   s   t ƒ S r
   r   r   r   r   r   rD   Ç   rE   zCContinuousPollingTests.test_connectionLostOnWrite.<locals>.<lambda>rB   r   N)r   r   r	   r   r2   r7   r   r?   r@   r   r   r   Útest_connectionLostOnWrite¿   s    


z1ContinuousPollingTests.test_connectionLostOnWritec                 C   sš   t tƒ ƒ}tƒ }tƒ }tƒ }| |¡ | |¡ | |¡ | |¡ | ¡ }|  | ¡ g ¡ |  | ¡ g ¡ |  t	|ƒd¡ |  t
|ƒt
|||gƒ¡ dS )zv
        L{_ContinuousPolling.removeAll} removes all descriptors and returns
        the readers and writers.
        é   N)r   r   r#   r&   r2   Z	removeAllr7   Ú
getReadersÚ
getWritersr=   Úset)r   r.   r/   r4   ZbothZremovedr   r   r   Útest_removeAllÎ   s    




z%ContinuousPollingTests.test_removeAllc                 C   s.   t tƒ ƒ}tƒ }| |¡ |  || ¡ ¡ dS )zb
        L{_ContinuousPolling.getReaders} returns a list of the read
        descriptors.
        N)r   r   r#   r&   ÚassertInrI   r-   r   r   r   Útest_getReadersâ   s    

z&ContinuousPollingTests.test_getReadersc                 C   s.   t tƒ ƒ}tƒ }| |¡ |  || ¡ ¡ dS )zc
        L{_ContinuousPolling.getWriters} returns a list of the write
        descriptors.
        N)r   r   r#   r2   rM   rJ   r3   r   r   r   Útest_getWritersí   s    

z&ContinuousPollingTests.test_getWritersNz(epoll not supported in this environment.)r   r   r   r   r0   r5   r9   r;   r<   r>   rA   rC   rF   rG   rL   rN   rO   r   Úskipr   r   r   r   r    /   s    	
r    )r   Z
__future__r   r   Ztwisted.trial.unittestr   Ztwisted.internet.epollreactorr   ÚImportErrorZtwisted.internet.taskr   Ztwisted.internet.errorr   r#   r	   r    r   r   r   r   Ú<module>   s   
