U
    
W[-  ć                   @   s¤   d Z ddlmZmZ ddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ edkrbdd	lmZ ndd
lmZ G dd deZG dd deZG dd dejZdS )zK
Tests for the internal implementation details of L{twisted.internet.udp}.
é    )ŚdivisionŚabsolute_importN)Śunittest)ŚDatagramProtocol)Śudp)ŚplatformTypeZwin32)ŚWSAEWOULDBLOCK)ŚEWOULDBLOCKc                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	ŚStringUDPSocketa  
    A fake UDP socket object, which returns a fixed sequence of strings and/or
    socket errors.  Useful for testing.

    @ivar retvals: A C{list} containing either strings or C{socket.error}s.

    @ivar connectedAddr: The address the socket is connected to.
    c                 C   s   || _ d | _d S ©N)ŚretvalsŚconnectedAddr)Śselfr   © r   śJ/usr/lib/python3/dist-packages/twisted/internet/test/test_udp_internals.pyŚ__init__"   s    zStringUDPSocket.__init__c                 C   s
   || _ d S r   )r   )r   Śaddrr   r   r   Śconnect'   s    zStringUDPSocket.connectc                 C   s$   | j  d”}t|tjr||dfS )zH
        Return (or raise) the next value from C{self.retvals}.
        r   N)r   ŚpopŚ
isinstanceŚsocketŚerror)r   ŚsizeZretr   r   r   Śrecvfrom+   s    zStringUDPSocket.recvfromN)Ś__name__Ś
__module__Ś__qualname__Ś__doc__r   r   r   r   r   r   r   r
      s   	r
   c                   @   s    e Zd ZdZdd Zdd ZdS )Ś	KeepReadsz%
    Accumulate reads in a list.
    c                 C   s
   g | _ d S r   )Śreads)r   r   r   r   r   ;   s    zKeepReads.__init__c                 C   s   | j  |” d S r   )r   Śappend)r   Śdatar   r   r   r   ŚdatagramReceived?   s    zKeepReads.datagramReceivedN)r   r   r   r   r   r"   r   r   r   r   r   6   s   r   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )ŚErrorsTestsz/
    Error handling tests for C{udp.Port}.
    c                 C   s   t j d” |  t jjd” t }t  d|”}tddt 	d”dt 	d”g|_| 
”  |  |jddg” | 
”  |  |jdddg” dS )z¤
        Socket reads with some good data followed by a socket error which can
        be ignored causes reading to stop, and no log messages to be logged.
        iØä’’Ns   results   123s   456)r   Z_sockErrReadIgnorer    Ś
addCleanupŚremover   ŚPortr
   r   r   ŚdoReadŚassertEqualr   ©r   ZprotocolZportr   r   r   Śtest_socketReadNormalI   s    ’’z!ErrorsTests.test_socketReadNormalc                 C   s   t j d” |  t jjd” t }dd |_t  d|”}tdt	 
d”dt	 
t”g|_	| ”  |  |jdg” | ”  |  |jddg” dS )zĖ
        If the socket is unconnected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is not called.
        éč’’c                   S   s   dd S )Né   r   r   r   r   r   r   Ś<lambda>m   ó    z5ErrorsTests.test_readImmediateError.<locals>.<lambda>Nó   aó   b)r   Ś_sockErrReadRefuser    r$   r%   r   ŚconnectionRefusedr&   r
   r   r   r	   r'   r(   r   r)   r   r   r   Śtest_readImmediateError`   s    
’z#ErrorsTests.test_readImmediateErrorc                    s¼   t j d” |  t jjd” t }g   fdd|_t  d|”}tdt	 
d”dt	 
t”g|_	| dd” | ”  |  |jdg” |   d	g” | ”  |  |jddg” |   d	g” dS )
zĀ
        If the socket connected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is called.
        r+   c                      s
      d”S )NT)r    r   ©Zrefusedr   r   r-      r.   z>ErrorsTests.test_connectedReadImmediateError.<locals>.<lambda>Nr/   r0   z	127.0.0.1i'  T)r   r1   r    r$   r%   r   r2   r&   r
   r   r   r	   r   r'   r(   r   r)   r   r4   r   Ś test_connectedReadImmediateError|   s     ’z,ErrorsTests.test_connectedReadImmediateErrorc                 C   sJ   t  }t d|”}tdt d”g|_|  tj|j” |  |j	dg” dS )zG
        Socket reads with an unknown socket error are raised.
        Ns   goodiĒś’’)
r   r   r&   r
   r   r   ZassertRaisesr'   r(   r   r)   r   r   r   Śtest_readUnknownError   s
    z!ErrorsTests.test_readUnknownErrorN)r   r   r   r   r*   r3   r5   r6   r   r   r   r   r#   D   s
   r#   )r   Z
__future__r   r   r   Ztwisted.trialr   Ztwisted.internet.protocolr   Ztwisted.internetr   Ztwisted.python.runtimer   Śerrnor   r	   Śobjectr
   r   ZSynchronousTestCaser#   r   r   r   r   Ś<module>   s   