Source code for rebootpy.auth

# -*- coding: utf-8 -*-

"""
MIT License

Copyright (c) 2019-2021 Terbau

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import datetime
import asyncio
import logging
import uuid
import time
import webbrowser
import secrets
import base64

from random import randint
from aioconsole import ainput
from typing import TYPE_CHECKING, Optional, Any, List

from .errors import AuthException, HTTPException
from .typedefs import StrOrMaybeCoro
from .utils import from_iso, to_iso

if TYPE_CHECKING:
    from .client import BasicClient

log = logging.getLogger(__name__)
_prompt_lock = asyncio.Lock()


class Auth:
    def __init__(self, **kwargs: Any) -> None:
        self.ios_token = kwargs.get('ios_token', 'M2Y2OWU1NmM3NjQ5NDkyYzhjYzI5ZjFhZjA4YThhMTI6YjUxZWU5Y2IxMjIzNGY1MGE2OWVmYTY3ZWY1MzgxMmU=')  # noqa
        self.device_id = getattr(self, 'device_id', None) or uuid.uuid4().hex

        # It's recommended you only change this if you know what you're doing
        # as certain functions/API calls may start causing errors.
        self.access_token_type = kwargs.get('token_type', 'eg1')

    def initialize(self, client: 'BasicClient') -> None:
        self.client = client

        if not self.client.supports_non_eg1 and self.access_token_type != 'eg1':  # noqa
            raise ValueError(
                'Different token types are only supported for BasicClient'
            )

        self._refresh_event = asyncio.Event()
        self._refresh_lock = asyncio.Lock()
        self.refresh_i = 0

    @property
    def ios_authorization(self) -> str:
        return f'Bearer {self.ios_access_token}'

    @property
    def eas_authorization(self) -> str:
        return f'Bearer {self.eas_access_token}'
    
    @property
    def eos_authorization(self) -> str:
        return f'Bearer {self.eos_access_token}'

    @property
    def authorization(self) -> str:
        return self.ios_authorization

    @property
    def identifier(self) -> str:
        raise NotImplementedError

    def eula_check_needed(self) -> bool:
        return True

    async def authenticate(self, **kwargs) -> dict:
        raise NotImplementedError

    async def _authenticate(self, priority: int = 0) -> None:
        max_attempts = 3
        for i in range(max_attempts):
            try:
                log.info('Running authentication.')
                return await self.authenticate(priority=priority)
            except HTTPException as exc:
                codes = (
                    ('errors.com.epicgames.account.oauth.'
                     'exchange_code_not_found'),
                    ('errors.com.epicgames.account.oauth.'
                     'expired_exchange_code_session'),
                )
                if exc.message_code in codes:
                    if i != max_attempts-1:
                        continue

                raise
            except asyncio.CancelledError:
                return False

    async def reauthenticate(self, priority: int = 0) -> dict:
        raise NotImplementedError

    async def get_eula_version(self, **kwargs: Any) -> int:
        data = await self.client.http.eulatracking_get_data(**kwargs)
        return data['version'] if isinstance(data, dict) else 0

    async def accept_eula(self, **kwargs: Any) -> None:
        version = await self.get_eula_version(**kwargs)
        if version != 0:
            await self.client.http.eulatracking_accept(
                version,
                **kwargs
            )

            try:
                await self.client.http.fortnite_grant_access(**kwargs)
            except HTTPException as e:
                if e.message_code != 'errors.com.epicgames.bad_request':
                    raise

    def _update_ios_data(self, data: dict) -> None:
        self.ios_access_token = data['access_token']
        self.ios_expires_in = data['expires_in']
        self.ios_expires_at = from_iso(data["expires_at"])
        self.ios_token_type = data['token_type']
        self.ios_refresh_token = data['refresh_token']
        self.ios_refresh_expires = data.get('refresh_expires', 7200)
        self.ios_refresh_expires_at = data.get(
            'refresh_expires_at',
            datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=2)
        )
        self.ios_account_id = data['account_id']
        self.ios_client_id = data['client_id']
        self.ios_internal_client = data['internal_client']
        self.ios_client_service = data['client_service']
        self.ios_app = data['app']
        self.ios_in_app_id = data['in_app_id']

        self.account_id = self.ios_account_id

    def _update_eas_data(self, data: dict) -> None:
        self.eas_access_token = data['access_token']
        self.eas_expires_in = data['expires_in']
        self.eas_expires_at = from_iso(data["expires_at"])
        self.eas_token_type = data['token_type']
        self.eas_refresh_token = data['refresh_token']
        self.eas_client_id = data['client_id']
        self.eas_application_id = data['application_id']
        self.eas_scope = data['scope']

    def _update_eos_data(self, data: dict) -> None:
        self.eos_access_token = data['access_token']
        self.eos_expires_in = data['expires_in']
        self.eos_expires_at = from_iso(data["expires_at"])
        self.eos_token_type = data['token_type']
        self.eos_product_user_id = data['product_user_id']
        self.eos_organization_user_id = data['organization_user_id']
        self.eos_features = data['features']

    async def grant_refresh_token(self, refresh_token: str, auth_token: str, *,
                                  priority: int = 0) -> dict:
        payload = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token
        }

        return await self.client.http.account_oauth_grant(
            auth='basic {0}'.format(auth_token),
            data=payload,
            priority=priority
        )

    async def grant_eas_refresh_token(self,
                                      refresh_token: str,
                                      priority: int = 0) -> dict:
        payload  = {
            "grant_type": "refresh_token",
            "scope": "basic_profile friends_list presence openid",
            "refresh_token": refresh_token,
            "deployment_id": "62a9473a2dca46b29ccf17577fcf42d7"
        }

        return await self.client.http.eas_token_oauth_grant(
            auth=f'basic {self.ios_token}',
            data=payload,
            priority=priority
        )
    
    async def grant_eos_external_auth_token(
        self,
        external_auth_token: str,
        priority: int = 0
    ) -> dict:

        payload  = {
            "grant_type": "external_auth",
            "external_auth_type": "epicgames_access_token",
            "external_auth_token": external_auth_token,
            "deployment_id": "62a9473a2dca46b29ccf17577fcf42d7",
            "nonce": base64.urlsafe_b64encode(secrets.token_bytes(22)).decode('utf-8').rstrip('=')
        }
        try:
            return await self.client.http.eos_token_oauth_grant(
                auth=f'basic {self.ios_token}',
                data=payload,
                priority=priority
            )
        except HTTPException as e:
            if e.message_code == "errors.com.epicgames.eos.auth.user_not_found":
                continuation_token = e.raw.get('continuation_token')
                return await self.client.http.eos_token_oauth_continuation(
                    auth=f'Bearer {continuation_token}',
                    priority=priority
                )
            raise


    async def get_exchange_code(self, *,
                                auth='IOS_ACCESS_TOKEN',
                                priority: int = 0) -> str:
        data = await self.client.http.account_get_exchange_data(
            auth=auth,
            priority=priority
        )
        return data['code']

    async def exchange_code_for_session(self, token: str, code: str, *,
                                        priority: int = 0) -> dict:
        payload = {
            'grant_type': 'exchange_code',
            'exchange_code': code,
            'token_type': self.access_token_type,
        }

        return await self.client.http.account_oauth_grant(
            auth='basic {0}'.format(token),
            data=payload,
            priority=priority
        )

    async def get_ios_client_credentials(self):
        payload = {
            'grant_type': 'client_credentials'
        }

        return await self.client.http.account_oauth_grant(
            auth='IOS_BASIC_TOKEN',
            data=payload
        )

    async def kill_token(self, token: str) -> None:
        await self.client.http.account_sessions_kill_token(
            token,
            auth='Bearer {0}'.format(token)
        )

    async def kill_other_sessions(self, auth: str = 'IOS_ACCESS_TOKEN', *,
                                  priority: int = 0) -> None:
        await self.client.http.account_sessions_kill(
            'OTHERS_ACCOUNT_CLIENT_SERVICE',
            auth=auth,
            priority=priority
        )
        log.debug('Killing other sessions')

    def refresh_loop_running(self):
        try:
            task = self.client._refresh_task
        except AttributeError:
            return False

        return task is not None and not task.cancelled()

    async def schedule_token_refresh(self) -> None:
        expires = [
            self.ios_expires_at,
            self.eas_expires_at,
            self.eos_expires_at
        ]

        min_expires_at = min(expires)

        subtracted = min_expires_at - datetime.datetime.now(datetime.timezone.utc)
        self.token_timeout = (subtracted).total_seconds() - 300
        await asyncio.sleep(self.token_timeout)

    async def run_refresh_loop(self) -> None:
        loop = self.client.loop

        while True:
            self._refresh_event.clear()
            _, p = await asyncio.wait((
                loop.create_task(self._refresh_event.wait()),
                loop.create_task(self.schedule_token_refresh())
            ), return_when=asyncio.FIRST_COMPLETED)

            for pending in p:
                if not pending.cancelled():
                    pending.cancel()

            await self.do_refresh()

    async def do_refresh(self) -> None:
        reauth_lock = self.client._reauth_lock
        reauth_lock.priority += 1
        forced = reauth_lock.locked()

        try:
            if not forced:
                await self._refresh_lock.acquire()

            log.debug('Refreshing session')
            self.client._refresh_times.append(time.time())

            try:
                data = await self.grant_refresh_token(
                    self.ios_refresh_token,
                    self.ios_token,
                    priority=reauth_lock.priority
                )
                self._update_ios_data(data)

                data = await self.grant_eas_refresh_token(
                    self.eas_refresh_token,
                    priority=reauth_lock.priority
                )
                self._update_eas_data(data)

                data = await self.grant_eos_external_auth_token(
                    self.eas_access_token,
                    priority=reauth_lock.priority
                )
                self._update_eos_data(data)
            except (HTTPException, AttributeError) as exc:
                m = 'errors.com.epicgames.account.auth_token.' \
                    'invalid_refresh_token'
                if isinstance(exc, HTTPException) and exc.message_code != m:
                    raise

                log.debug(
                    'Invalid refresh token was supplied. Attempting to '
                    'reconnect if possible.'
                )
                try:
                    await self.reauthenticate(
                        priority=reauth_lock.priority
                    )
                except NotImplementedError:
                    raise exc

                log.debug('Successfully reauthenticated.')

            await self.client.dispatch_and_wait_event('internal_auth_refresh')

            self.refresh_i += 1
            log.debug('Sessions was successfully refreshed.')
            self.client.dispatch_event('auth_refresh')

        finally:
            if not forced:
                self._refresh_lock.release()

    async def run_refresh(self) -> None:
        self._refresh_event.set()
        await self.client.wait_for('auth_refresh')

    def refreshing(self) -> bool:
        return self._refresh_lock.locked()

    async def fetch_device_auths(self) -> List[dict]:
        # Return payload:
        # [
        #     {
        #         "deviceId": "01fbc14162634294a1db59ddced3c10c",
        #         "accountId": "4a6313cbbe7d432b8132b5ee67ad53fa",
        #         "userAgent": "EpicGamesLauncher/++Fortnite+Release-12.00-CL-11586896 Windows/10.0.17134.1.768.64bit",  # noqa
        #         "created": {
        #             "location": "Oslo, Norway",
        #             "ipAddress": "",  # ipv4 address
        #             "dateTime": "2020-04-25T20:38:57.570Z"
        #         },
        #         "lastAccess": {
        #             "location": "Oslo, Norway",
        #             "ipAddress": "",  # ipv4 address
        #             "dateTime": "2020-05-13T16:33:43.075Z"
        #         }
        #     }
        # ]
        return await self.client.http.account_get_device_auths(
            self.ios_account_id
        )

    async def generate_device_auth(self) -> dict:
        # Return payload:
        # {
        #     "deviceId": "e2ba6aa72411468ba4fee016086809d7",
        #     "accountId": "4a6313cbbe7d432b8132b5ee67ad53fa",
        #     "secret": "",  # 32 char (not hex)
        #     "userAgent": "EpicGamesLauncher/++Fortnite+Release-12.00-CL-11586896 Windows/10.0.17134.1.768.64bit",  # noqa
        #     "created": {
        #         "location": "Oslo, Norway",
        #         "ipAddress": "",  # ipv4
        #         "dateTime": "2020-05-13T18:32:07.601Z"
        #     }
        # }
        return await self.client.http.account_generate_device_auth(
            self.ios_account_id
        )

    async def delete_device_auth(self, device_id: str) -> None:
        await self.client.http.account_delete_device_auth(
            self.ios_account_id,
            device_id
        )

[docs] class ExchangeCodeAuth(Auth): """Authenticates by an exchange code. .. note:: The method to get an exchange code has been significantly harder since epic patched the old method of copying the code from one of their own endpoints that could be requested easily in a browser. To obtain an exchange code it is recommended to provide a custom solution like running a selenium process where you log in on https://epicgames.com and then redirect to /id/api/exchange/generate. You can then return the exchange code. You can put this solution in a function and then pass this to ``exchange_code``. .. note:: An exchange code only works for a single login within a short timeframe (300 seconds). Therefore you need to get a new code for each login. You can get a new code by refreshing the site. Parameters ---------- code: Union[:class:`str`, Callable, Awaitable] The exchange code or a function/coroutine that when called returns the exchange code. device_id: Optional[:class:`str`] A 32 char hex string representing your device. ios_token: Optional[:class:`str`] The main Fortnite token to use with authentication. You should generally not need to set this manually. Attributes ---------- authorization: :class:`str` The Authorization header for use with Fortnite endpoints, use this if you're making HTTP requests that aren't already implemented. """ def __init__(self, code: StrOrMaybeCoro, **kwargs: Any) -> None: super().__init__(**kwargs) self.code = code self.resolved_code = None async def resolve(self, code: StrOrMaybeCoro) -> str: if isinstance(code, str): return code elif asyncio.iscoroutinefunction(code): res = await code() else: res = code() if not isinstance(res, str): raise TypeError('Return type of callable func/coro must be str') return res @property def identifier(self) -> str: return self.resolved_code async def ios_authenticate(self, priority: int = 0) -> dict: log.info('Exchanging code.') self.resolved_code = await self.resolve(self.code) try: data = await self.exchange_code_for_session( self.ios_token, self.resolved_code ) except HTTPException as e: m = 'errors.com.epicgames.account.oauth.exchange_code_not_found' if e.message_code == m: raise AuthException( 'Invalid exchange code supplied', e ) from e raise return data async def authenticate(self, priority: int = 0, **kwargs) -> None: data = await self.ios_authenticate(priority=priority) self._update_ios_data(data) if self.client.kill_other_sessions: await self.kill_other_sessions(priority=priority) eas_data, *_ = await asyncio.gather( self.grant_eas_refresh_token( self.ios_refresh_token, priority=priority ), self.client._setup_client_user(priority=priority), self.client._fetch_user_agent(priority=priority) ) self._update_eas_data(eas_data) eos_data = await self.grant_eos_external_auth_token( self.eas_access_token, priority=priority ) self._update_eos_data(eos_data)
[docs] class AuthorizationCodeAuth(ExchangeCodeAuth): """Authenticates by exchange code. You can get the code from `here <https://www.epicgames.com/id/api/redirect? clientId=3f69e56c7649492c8cc29f1af08a8a12&responseType=code>`_ by logging in and copying the code from the redirectUrl's query parameters. If you are already logged in and want to change accounts, simply log out at https://www.epicgames.com, log in to the new account and then enter the link above again to generate an authorization code. **How to get an authorization code:** .. image:: https://raw.githubusercontent.com/xMistt/rebootpy/main/docs/resources/images/authorization_code.png .. note:: An authorization code only works for a single login within a short timeframe (300 seconds). Therefore you need to get a new code for each login. You can get a new code by refreshing the site. Parameters ---------- code: Union[:class:`str`, Callable, Awaitable] The authorization code or a function/coroutine that when called returns the authorization code. device_id: Optional[:class:`str`] A 32 char hex string representing your device. ios_token: Optional[:class:`str`] The main Fortnite token to use with authentication. You should generally not need to set this manually. Attributes ---------- authorization: :class:`str` The Authorization header for use with Fortnite endpoints, use this if you're making HTTP requests that aren't already implemented. """ def __init__(self, code: StrOrMaybeCoro, **kwargs: Any) -> None: super().__init__(code, **kwargs) async def ios_authenticate(self, priority: int = 0) -> dict: self.resolved_code = await self.resolve(self.code) payload = { 'grant_type': 'authorization_code', 'code': self.resolved_code, } try: data = await self.client.http.account_oauth_grant( auth='basic {0}'.format(self.ios_token), data=payload, priority=priority ) except HTTPException as e: m = 'errors.com.epicgames.account.oauth.authorization_code_not_found' # noqa if e.message_code == m: raise AuthException( 'Invalid authorization code supplied', e ) from e raise return data
[docs] class DeviceAuth(Auth): """Authenticate with device auth details. .. note:: When an account's password is reset, all device authentications associated with the account are removed. If your device ID and secret are compromised, resetting your password will invalidate all authentication data, making it useless to anyone who may have accessed the leaked data. Parameters ---------- device_id: :class:`str` The device id. account_id: :class:`str` The account's id. secret: :class:`str` The secret. ios_token: Optional[:class:`str`] The main Fortnite token to use with authentication. You should generally not need to set this manually. Attributes ---------- authorization: :class:`str` The Authorization header for use with Fortnite endpoints, use this if you're making HTTP requests that aren't already implemented. """ def __init__(self, device_id: str, account_id: str, secret: str, **kwargs: Any) -> None: super().__init__(**kwargs) self.device_id = device_id self.account_id = account_id self.secret = secret @property def identifier(self) -> str: return self.account_id def eula_check_needed(self) -> bool: return False async def ios_authenticate(self, priority: int = 0) -> dict: payload = { 'grant_type': 'device_auth', 'device_id': self.device_id, 'account_id': self.account_id, 'secret': self.secret, 'token_type': self.access_token_type } try: data = await self.client.http.account_oauth_grant( auth='basic {0}'.format(self.ios_token), data=payload, priority=priority ) except HTTPException as exc: m = 'errors.com.epicgames.account.invalid_account_credentials' if exc.message_code == m: raise AuthException( 'Invalid device auth details passed.', exc ) from exc if exc.message_code == 'errors.com.epicgames.oauth.corrective_action_required': action = exc.raw.get('correctiveAction') log.debug("Corrective action is required: " + action) if action == 'DATE_OF_BIRTH': client_credentials = await self.get_ios_client_credentials() client_access_token = client_credentials.get('access_token') random_date = "{:04d}-{:02d}-{:02d}".format(randint(1990, 2002), randint(1, 12), randint(1, 28)) await self.client.http.account_put_date_of_birth_correction( continuation=exc.raw.get('continuation'), date_of_birth=random_date, auth='Bearer {0}'.format(client_access_token) ) return await self.ios_authenticate(priority) raise AuthException( 'Required corrective action {} is not supported'.format(action), exc ) from exc raise return data async def authenticate(self, priority: int = 0) -> None: data = await self.ios_authenticate(priority=priority) self._update_ios_data(data) if self.client.kill_other_sessions: await self.kill_other_sessions(priority=priority) eas_data, *_ = await asyncio.gather( self.grant_eas_refresh_token( self.ios_refresh_token, priority=priority ), self.client._setup_client_user(priority=priority), self.client._fetch_user_agent(priority=priority) ) self._update_eas_data(eas_data) eos_data = await self.grant_eos_external_auth_token( self.eas_access_token, priority=priority ) self._update_eos_data(eos_data) async def reauthenticate(self, priority: int = 0) -> None: """Used for reauthenticating if refreshing fails.""" log.debug('Starting reauthentication.') ret = await self.authenticate(priority=priority) log.debug('Successfully reauthenicated.') return ret
[docs] class RefreshTokenAuth(Auth): """Authenticates by the passed launcher refresh token. Parameters ---------- refresh_token: :class:`str` A valid launcher refresh token. Attributes ---------- authorization: :class:`str` The Authorization header for use with Fortnite endpoints, use this if you're making HTTP requests that aren't already implemented. """ def __init__(self, refresh_token: str, **kwargs: Any) -> None: super().__init__(**kwargs) self._refresh_token = refresh_token @property def identifier(self) -> str: return self._refresh_token def eula_check_needed(self) -> bool: return False async def ios_authenticate(self, priority: int = 0) -> dict: data = await self.grant_refresh_token( self._refresh_token, self.ios_token, priority=priority ) return data async def authenticate(self, priority: int = 0) -> None: data = await self.ios_authenticate(priority=priority) self._update_ios_data(data) if self.client.kill_other_sessions: await self.kill_other_sessions(priority=priority) eas_data, *_ = await asyncio.gather( self.grant_eas_refresh_token( self.ios_refresh_token, priority=priority ), self.client._setup_client_user(priority=priority), self.client._fetch_user_agent(priority=priority) ) self._update_eas_data(eas_data) eos_data = await self.grant_eos_external_auth_token( self.eas_access_token, priority=priority ) self._update_eos_data(eos_data)
[docs] class AdvancedAuth(Auth): """Authenticates by the available data in the following order: 1. By :class:`DeviceAuth` if ``device_id``, ``account_id`` and ``secret`` are present. 2. :class:`DeviceCodeAuth` is tried if ``prompt_device_code`` is ``True``. 3. :class:`ExchangeCodeAuth` is tried if ``exchange_code`` is present or if ``prompt_exchange_code`` is ``True``. 4. :class:`AuthorizationCodeAuth` is tried if ``authorization_code`` is present or if ``prompt_authorization_code`` is ``True``. If the authentication was not done by step 1, a device auth is automatically generated and the details will be dispatched to :func:`event_device_auth_generate`. It is important to store these values somewhere since they can be used for easier logins. If you'd like to deal with the device code link yourself instead of it being printed to console, you can use :func:`event_device_code_generated()`. Parameters ---------- exchange_code: Optional[Union[:class:`str`, Callable, Awaitable]] The exchange code or a function/coroutine that when called returns the exchange code. authorization_code: Optional[Union[:class:`str`, Callable, Awaitable]] The authorization code or a function/coroutine that when called returns the authorization code. device_id: Optional[:class:`str`] The device id to use for the login. account_id: Optional[:class:`str`] The account id to use for the login. secret: Optional[:class:`str`] The secret to use for the login. prompt_device_code: :class:`bool` If this is set to ``True`` and no exchange code is passed, you will be prompted to enter the exchange code in the console if needed. .. note:: Both ``prompt_exchange_code`` and ``prompt_authorization_code`` cannot be True at the same time. prompt_exchange_code: :class:`bool` If this is set to ``True`` and no exchange code is passed, you will be prompted to enter the exchange code in the console if needed. .. note:: Both ``prompt_exchange_code`` and ``prompt_authorization_code`` cannot be True at the same time. prompt_authorization_code: :class:`bool` If this is set to ``True`` and no authorization code is passed, you will be prompted to enter the authorization code in the console if needed. .. note:: Both ``prompt_exchange_code`` and ``prompt_authorization_code`` cannot be True at the same time. prompt_code_if_invalid: :class:`bool` Whether or not to prompt a code if the device auth details was invalid. If this is False then the regular :exc:`AuthException` is raised instead. .. note:: This only works if ``prompt_exchange_code`` or ``prompt_authorization_code`` is ``True``. prompt_code_if_throttled: :class:`bool` If this is set to ``True`` and you receive a throttling response, you will be prompted to enter a code in the console. .. note:: This only works if ``prompt_exchange_code`` or ``prompt_authorization_code`` is ``True``. delete_existing_device_auths: :class:`bool` Whether or not to delete all existing device auths when a new is created. ios_token: Optional[:class:`str`] The main Fortnite token to use with authentication. You should generally not need to set this manually. Attributes ---------- authorization: :class:`str` The Authorization header for use with Fortnite endpoints, use this if you're making HTTP requests that aren't already implemented. """ def __init__(self, exchange_code: Optional[StrOrMaybeCoro] = None, authorization_code: Optional[StrOrMaybeCoro] = None, device_id: Optional[str] = None, account_id: Optional[str] = None, secret: Optional[str] = None, prompt_exchange_code: bool = False, prompt_authorization_code: bool = False, prompt_device_code: bool = True, open_link_in_browser: bool = True, prompt_code_if_invalid: bool = False, prompt_code_if_throttled: bool = False, delete_existing_device_auths: bool = False, **kwargs: Any) -> None: super().__init__(**kwargs) self.exchange_code = exchange_code self.authorization_code = authorization_code self.device_id = device_id self.account_id = account_id self.secret = secret self.delete_existing_device_auths = delete_existing_device_auths self.prompt_exchange_code = prompt_exchange_code self.prompt_authorization_code = prompt_authorization_code self.prompt_device_code = prompt_device_code self.open_link_in_browser = open_link_in_browser if self.prompt_exchange_code and self.prompt_authorization_code: raise ValueError('Both prompt_exchange_code and ' 'prompt_authorization_code cannot be True at ' 'the same time.') self.prompt_code_if_invalid = prompt_code_if_invalid self.prompt_code_if_throttled = prompt_code_if_throttled self.kwargs = kwargs self._used_auth = None @property def identifier(self) -> str: return self.account_id or self.authorization_code or self.exchange_code def eula_check_needed(self) -> bool: return self._used_auth.eula_check_needed() def exchange_code_ready(self) -> bool: return self.exchange_code is not None def authorization_code_ready(self) -> bool: return self.authorization_code is not None def code_ready(self) -> bool: return self.exchange_code_ready() or self.authorization_code_ready() def device_auth_ready(self) -> bool: return self.device_id and self.account_id and self.secret def prompt_enabled(self) -> bool: return True in (self.prompt_exchange_code, self.prompt_authorization_code) # noqa def get_prompt_type_name(self) -> str: if self.prompt_exchange_code: return 'exchange' elif self.prompt_authorization_code: return 'authorization' async def run_exchange_code_authenticate(self) -> dict: auth = ExchangeCodeAuth( code=self.exchange_code, **self.kwargs ) auth.initialize(self.client) self._used_auth = auth return await auth.ios_authenticate() async def run_authorization_code_authenticate(self) -> dict: auth = AuthorizationCodeAuth( code=self.authorization_code, **self.kwargs ) auth.initialize(self.client) self._used_auth = auth return await auth.ios_authenticate() async def run_device_code_authenticate(self) -> dict: auth = DeviceCodeAuth( open_link_in_browser=self.open_link_in_browser ) auth.initialize(self.client) self._used_auth = auth return await auth.ios_authenticate() async def run_device_authenticate(self, device_id: Optional[str] = None, account_id: Optional[str] = None, secret: Optional[str] = None, *, priority: int = 0 ) -> dict: auth = DeviceAuth( device_id=device_id or self.device_id, account_id=account_id or self.account_id, secret=secret or self.secret, **self.kwargs ) auth.initialize(self.client) self._used_auth = auth return await auth.ios_authenticate(priority=priority) async def ios_authenticate(self, priority: int = 0) -> dict: data = None prompt_message = '' if self.device_auth_ready(): try: return await self.run_device_authenticate() except AuthException as exc: original = exc.original if not self.prompt_enabled() or not self.prompt_code_if_invalid: # noqa raise if isinstance(original, HTTPException): m = 'errors.com.epicgames.account.invalid_account_credentials' # noqa if original.message_code != m: raise prompt_message = 'Invalid device auth details passed. ' if self.prompt_device_code: try: data = await self.run_device_code_authenticate() except AuthException as exc: original = exc.original if not self.prompt_enabled() or not self.prompt_code_if_invalid: # noqa raise if data is None: prompted = False code = None if not self.code_ready() and self.prompt_enabled(): prompted = True code_type = self.get_prompt_type_name() text = '{0}Please enter a valid {1} code.\n'.format( prompt_message, code_type ) async with _prompt_lock: code = await ainput( text, ) if (prompted and self.prompt_exchange_code) or self.exchange_code_ready(): # noqa self.exchange_code = code or self.exchange_code data = await self.run_exchange_code_authenticate() else: self.authorization_code = code or self.authorization_code data = await self.run_authorization_code_authenticate() self._update_ios_data(data) if self.delete_existing_device_auths: tasks = [] auths = await self.fetch_device_auths() for auth in auths: tasks.append(self.client.loop.create_task( self.delete_device_auth( auth['deviceId'] ) )) if tasks: await asyncio.gather(*tasks) data = await self.generate_device_auth() details = { 'device_id': data['deviceId'], 'account_id': data['accountId'], 'secret': data['secret'], } self.__dict__.update(details) self.client.dispatch_event( 'device_auth_generate', details ) return data async def authenticate(self, **kwargs) -> None: data = await self.ios_authenticate() self._update_ios_data(data) if self.client.kill_other_sessions: await self.kill_other_sessions() eas_data, *_ = await asyncio.gather( self.grant_eas_refresh_token( self.ios_refresh_token, ), self.client._setup_client_user(), self.client._fetch_user_agent() ) self._update_eas_data(eas_data) eos_data = await self.grant_eos_external_auth_token( self.eas_access_token ) self._update_eos_data(eos_data) async def reauthenticate(self, priority: int = 0) -> None: log.debug('Starting reauthentication.') await self.run_device_authenticate( device_id=self.device_id, account_id=self.account_id, secret=self.secret, priority=priority, ) if self.client.kill_other_sessions: await self.kill_other_sessions(priority=priority) data = await self.grant_eas_refresh_token( self.ios_refresh_token, priority=priority ) self._update_eas_data(data) data = await self.grant_eos_external_auth_token( self.eas_access_token, priority=priority ), self._update_eos_data(data) log.debug('Successfully reauthenticated.')
[docs] class DeviceCodeAuth(Auth): """Authenticate with device code. If you'd like to deal with the device code link yourself instead of it being printed to console, you can use :func:`event_device_code_generated()`. Parameters ---------- open_link_in_browser: :class:`bool` Whether or not to automatically open the Epic Games login in the default browser. ios_token: Optional[:class:`str`] The main Fortnite token to use with authentication. You should generally not need to set this manually. switch_token: Optional[:class:`str`] The switch token to use with authentication. You should generally not need to set this manually. Attributes ---------- authorization: :class:`str` The Authorization header for use with Fortnite endpoints, use this if you're making HTTP requests that aren't already implemented. """ def __init__(self, open_link_in_browser: bool = True, **kwargs: Any) -> None: super().__init__(**kwargs) self.switch_token = kwargs.get('switch_token', 'OThmN2U0MmMyZTNhNGY4NmE3NGViNDNmYmI0MWVkMzk6MGEyNDQ5YTItMDAxYS00NTFlLWFmZWMtM2U4MTI5MDFjNGQ3') #noqa self.open_link_in_browser = open_link_in_browser @property def identifier(self) -> str: return self.account_id def eula_check_needed(self) -> bool: return False async def ios_authenticate(self, priority: int = 0) -> dict: switch_token = await self.client.http.account_oauth_grant( auth=f'basic {self.switch_token}', data={ 'grant_type': 'client_credentials' }, priority=priority ) device_code = await self.client.http.account_create_device_code( auth=f'Bearer {switch_token["access_token"]}', headers={ "Content-Type": "application/x-www-form-urlencoded" }, priority=priority ) if self.open_link_in_browser: webbrowser.open(device_code['verification_uri_complete'], new=1) if self.client._event_has_destination('device_code_generated'): self.client.dispatch_event( 'device_code_generated', device_code['verification_uri_complete'] ) else: print( f"Please login via {device_code['verification_uri_complete']}" ) while True: try: exchange_access_token = await self.client.http.account_oauth_grant( auth=f'basic {self.switch_token}', data={ "grant_type": "device_code", "device_code": device_code['device_code'], 'token_type': self.access_token_type }, headers={ "Content-Type": "application/x-www-form-urlencoded" }, priority=priority ) if 'access_token' in exchange_access_token: break except HTTPException as exc: if exc.message_code == 'errors.com.epicgames.account.oauth.authorization_pending': pass elif exc.message_code == 'errors.com.epicgames.not_found': pass elif exc.message_code == 'errors.com.epicgames.oauth.corrective_action_required': action = exc.raw.get('correctiveAction') log.debug("Corrective action is required: " + action) if action == 'DATE_OF_BIRTH': client_credentials = await self.get_ios_client_credentials() client_access_token = client_credentials.get('access_token') random_date = "{:04d}-{:02d}-{:02d}".format(randint(1990, 2002), randint(1, 12), randint(1, 28)) await self.client.http.account_put_date_of_birth_correction( continuation=exc.raw.get('continuation'), date_of_birth=random_date, auth='Bearer {0}'.format(client_access_token) ) return await self.ios_authenticate(priority) raise AuthException( 'Required corrective action {} is not supported'.format(action), exc ) from exc else: raise AuthException(f'Unknown error when checking device code - {exc.message_code}', exc) await asyncio.sleep(10) exchange_code = await self.client.http.account_get_exchange_data( auth=f"Bearer {exchange_access_token['access_token']}", priority=priority ) data = await self.client.http.account_oauth_grant( auth=f'basic {self.ios_token}', data={ "grant_type": "exchange_code", "exchange_code": exchange_code['code'], 'token_type': self.access_token_type }, priority=priority ) return data async def authenticate(self, priority: int = 0) -> None: data = await self.ios_authenticate(priority=priority) self._update_ios_data(data) if self.client.kill_other_sessions: await self.kill_other_sessions(priority=priority) eas_data, *_ = await asyncio.gather( self.grant_eas_refresh_token( self.ios_refresh_token, priority=priority ), self.client._setup_client_user(priority=priority), self.client._fetch_user_agent(priority=priority) ) self._update_eas_data(eas_data) eos_data = await self.grant_eos_external_auth_token( self.eas_access_token, priority=priority ) self._update_eos_data(eos_data) async def reauthenticate(self, priority: int = 0) -> None: """Used for reauthenticating if refreshing fails.""" log.debug('Starting reauthentication.') ret = await self.authenticate(priority=priority) log.debug('Successfully reauthenicated.') return ret