PK ! \" " google_music/__about__.py__all__ = [
'__author__',
'__author_email__',
'__copyright__',
'__license__',
'__summary__',
'__title__',
'__url__',
'__version__',
'__version_info__'
]
__title__ = 'google-music'
__summary__ = 'A Google Music API wrapper.'
__url__ = 'https://github.com/thebigmunch/google-music'
__version__ = '3.0.1'
__version_info__ = tuple(int(i) for i in __version__.split('.') if i.isdigit())
__author__ = 'thebigmunch'
__author_email__ = 'mail@thebigmunch.me'
__license__ = 'MIT'
__copyright__ = f'2018-2019 {__author__} <{__author_email__}>'
PK ! i google_music/__init__.pyfrom .__about__ import *
from .api import *
from .clients import *
__all__ = [
*__about__.__all__,
*api.__all__,
*clients.__all__
]
PK ! fLk k google_music/api.py__all__ = ['mobileclient', 'musicmanager']
from .clients import MobileClient, MusicManager
def mobileclient(username=None, device_id=None, *, token=None, locale='en_US'):
"""Create and authenticate a Google Music mobile client.
>>> import google_music
>>> mc = google_music.mobileclient('username')
Parameters:
username (str, Optional): Your Google Music username.
This is used to store OAuth credentials for different accounts separately.
device_id (str, Optional): A mobile device ID. Default: MAC address is used.
token (dict, Optional): An OAuth token compatible with ``requests-oauthlib``.
locale (str, Optional): `ICU `__ locale used to localize some
responses. This must be a locale supported by Android. Default: `'en_US'``.
Returns:
MobileClient: An authenticated :class:`~google_music.MobileClient` instance.
"""
return MobileClient(
username,
device_id,
token=token,
locale=locale
)
def musicmanager(username=None, uploader_id=None, *, token=None):
"""Create and authenticate a Google Music Music Manager client.
>>> import google_music
>>> mm = google_music.musicmanager('username')
Parameters:
username (str, Optional): Your Google Music username.
This is used to store OAuth credentials for different accounts separately.
device_id (str, Optional): A mobile device ID. Default: MAC address is used.
token (dict, Optional): An OAuth token compatible with ``requests-oauthlib``.
Returns:
MusicManager: An authenticated :class:`~google_music.MusicManager` instance.
"""
return MusicManager(
username,
uploader_id,
token=token
)
PK ! "=v v google_music/clients/__init__.pyfrom .mobileclient import *
from .musicmanager import *
__all__ = [
*mobileclient.__all__,
*musicmanager.__all__
]
PK ! Hz google_music/clients/base.pyimport time
import requests
from google_music_proto.oauth import AUTHORIZATION_BASE_URL, REDIRECT_URI, TOKEN_URL
from tenacity import retry, stop_after_attempt, wait_exponential
from ..session import GoogleMusicSession, dump_token, load_token
# TODO: Configurable token updater/saver/loader.
class GoogleMusicClient:
def _oauth(self, username, *, token=None):
auto_refresh_kwargs = {
'client_id': self.client_id,
'client_secret': self.client_secret
}
self.session = GoogleMusicSession(
client_id=self.client_id,
scope=self.oauth_scope,
redirect_uri=REDIRECT_URI,
auto_refresh_url=TOKEN_URL,
auto_refresh_kwargs=auto_refresh_kwargs,
token_updater=self._update_token
)
if not token:
try:
token = load_token(username, self.client)
token['expires_at'] = time.time() - 10
except FileNotFoundError:
authorization_url, state = self.session.authorization_url(
AUTHORIZATION_BASE_URL,
access_type='offline',
prompt='select_account'
)
code = input(
f"Visit:\n\n{authorization_url}\n\nFollow the prompts and paste provided code: "
)
token = self.session.fetch_token(
TOKEN_URL,
client_secret=self.client_secret,
code=code
)
self.token = self.session.refresh_token(
TOKEN_URL,
refresh_token=token.get('refresh_token')
)
self._update_token()
@retry(
reraise=True,
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, max=10)
)
def _call(self, call_cls, *args, **kwargs):
call = call_cls(*args, **kwargs)
# Override default hl/tier params from google-music-proto for Mobileclient.
params = {**call.params, **self.session.params}
response = self.session.request(
call.method,
call.url,
headers=call.headers,
data=call.body,
params=params,
allow_redirects=call.follow_redirects
)
try:
response.raise_for_status()
except requests.HTTPError:
raise
return call.parse_response(response.headers, response.content)
def _update_token(self):
dump_token(self.token, self.username, self.client)
@property
def is_authenticated(self):
"""The authentication status of the client instance."""
return self.session.authorized
@property
def token(self):
return self.session.token
@token.setter
def token(self, token):
self.session.token = token
@property
def username(self):
"""The username associated with the client instance.
This is used to store OAuth credentials for different accounts separately.
"""
return self._username
def login(self, username, *, token=None):
"""Log in to Google Music.
Parameters:
username (str, Optional): Your Google Music username.
Used for keeping stored OAuth tokens for multiple accounts separate.
device_id (str, Optional): A mobile device ID or music manager uploader ID.
Default: MAC address is used.
token (dict, Optional): An OAuth token compatible with ``requests-oauthlib``.
Returns:
bool: ``True`` if successfully authenticated, ``False`` if not.
"""
self._username = username
self._oauth(username, token=token)
return self.is_authenticated
# TODO: Revoke oauth token/delete oauth token file.
def logout(self):
"""Log out of Google Music."""
self.session = None
self._username = None
return True
# TODO Revoke oauth token/delete oauth token file.
def switch_user(self, username='', *, token=None):
"""Log in to Google Music with a different user.
Parameters:
username (str, Optional): Your Google Music username.
Used for keeping stored OAuth tokens for multiple accounts separate.
token (dict, Optional): An OAuth token compatible with ``requests-oauthlib``.
Returns:
bool: ``True`` if successfully authenticated, ``False`` if not.
"""
if self.logout():
return self.login(username, token=token)
return False
PK ! ( ( $ google_music/clients/mobileclient.py__all__ = ['MobileClient']
from collections import defaultdict
from operator import itemgetter
from uuid import getnode as get_mac
import google_music_proto.mobileclient.calls as mc_calls
import more_itertools
from google_music_proto.mobileclient.types import (
ListenNowItemType,
QueryResultType,
StationSeedType
)
from google_music_proto.oauth import IOS_CLIENT_ID, IOS_CLIENT_SECRET, MOBILE_SCOPE
from .base import GoogleMusicClient
from ..utils import create_mac_string, get_ple_prev_next
# TODO: 'max_results', 'start_token', 'updated_min', 'quality', etc.
# TODO: Podcast edits.
# TODO: Station create/edit.
# TODO: Difference between shuffles and instant mixes?
# TODO: Situations are now returned through a protobuf call?
class MobileClient(GoogleMusicClient):
"""API wrapper class to access Google Music mobile client functionality.
>>> from google_music import MobileClient
>>> mc = MobileClient('username')
Note:
Streaming requires a ``device_id`` from a valid, linked mobile device.
The :class:`MobileClient` instance's ``device_id`` can be changed after
instantiation, or a different ``device_id`` provided to :meth:`stream`
or :meth:`stream_url`.
Parameters:
username (str, Optional): Your Google Music username.
This is used to store OAuth credentials for different accounts separately.
device_id (str, Optional): A mobile device ID.
Default: An ID is generated from your system's MAC address.
token (dict, Optional): An OAuth token compatible with ``requests-oauthlib``.
locale (str, Optional): `ICU `__ locale used to
localize some responses. This must be a locale supported by Android.
Default: `'en_US'``.
"""
client = 'mobileclient'
client_id = IOS_CLIENT_ID
client_secret = IOS_CLIENT_SECRET
oauth_scope = MOBILE_SCOPE
def __init__(self, username=None, device_id=None, *, token=None, locale='en_US'):
username = username or ''
if self.login(username, token=token):
self.locale = locale
self.tier = 'fr'
if device_id is None:
mac_int = get_mac()
if (mac_int >> 40) % 2:
raise OSError("A valid MAC address could not be obtained.")
self.device_id = create_mac_string(mac_int, delimiter='')
else:
self.device_id = device_id
self.is_subscribed
def __repr__(self):
return f"MobileClient(username={self.username!r}, device_id={self.device_id}, token={self.token}, locale={self.locale})"
@property
def device_id(self):
"""The mobile device ID of the :class:`MobileClient` instance."""
return self.session.headers.get('X-Device-ID')
@device_id.setter
def device_id(self, device_id):
self.session.headers.update({'X-Device-ID': device_id})
@property
def is_subscribed(self):
"""The subscription status of the account linked to the :class:`MobileClient` instance."""
subscribed = next(
(
config_item['value'] == 'true'
for config_item in self.config()
if config_item['key'] == 'isNautilusUser'
),
None
)
if subscribed:
self.tier = 'aa'
else:
self.tier = 'fr'
return subscribed
@property
def locale(self):
"""The locale of the :class:`MobileClient` instance.
Can be changed after instantiation.
`ICU `__ locale used to localize some
responses. This must be a locale supported by Android.
"""
return self.session.params.get('hl')
@locale.setter
def locale(self, locale):
self.session.params.update({'hl': locale})
@property
def tier(self):
"""The subscription tier of the :class:`MobileClient` instance.
Can be changed after instantiation.
``aa`` if subscribed, ``fr`` if not.
"""
return self.session.params.get('tier')
@tier.setter
def tier(self, tier):
self.session.params.update({'tier': tier})
def album(self, album_id, *, include_description=True, include_songs=True):
"""Get information about an album.
Parameters:
album_id (str): An album ID. Album IDs start with a 'B'.
include_description (bool, Optional): Include description of the album in the returned dict.
include_songs (bool, Optional): Include songs from the album in the returned dict.
Default: ``True``.
Returns:
dict: Album information.
"""
response = self._call(
mc_calls.FetchAlbum,
album_id,
include_description=include_description,
include_tracks=include_songs
)
album_info = response.body
return album_info
def artist(
self, artist_id, *, include_albums=True, num_related_artists=5, num_top_tracks=5
):
"""Get information about an artist.
Parameters:
artist_id (str): An artist ID. Artist IDs start with an 'A'.
include_albums (bool, Optional): Include albums by the artist in returned dict.
Default: ``True``.
num_related_artists (int, Optional): Include up to given number of related artists in returned dict.
Default: ``5``.
num_top_tracks (int, Optional): Include up to given number of top tracks in returned dict.
Default: ``5``.
Returns:
dict: Artist information.
"""
response = self._call(
mc_calls.FetchArtist,
artist_id,
include_albums=include_albums,
num_related_artists=num_related_artists,
num_top_tracks=num_top_tracks
)
artist_info = response.body
return artist_info
def browse_podcasts(self, podcast_genre_id='JZCpodcasttopchartall'):
"""Get the podcasts for a genre from the Podcasts browse tab.
Parameters:
podcast_genre_id (str, Optional): A podcast genre ID as found
in :meth:`browse_podcasts_genres`.
Default: ``'JZCpodcasttopchartall'``.
Returns:
list: Podcast dicts.
"""
response = self._call(
mc_calls.PodcastBrowse,
podcast_genre_id=podcast_genre_id
)
podcast_series_list = response.body.get('series', [])
return podcast_series_list
def browse_podcasts_genres(self):
"""Get the genres from the Podcasts browse tab dropdown.
Returns:
list: Genre groups that contain sub groups.
"""
response = self._call(
mc_calls.PodcastBrowseHierarchy
)
genres = response.body.get('groups', [])
return genres
def browse_stations(self, station_category_id):
"""Get the stations for a category from Browse Stations.
Parameters:
station_category_id (str): A station category ID as
found with :meth:`browse_stations_categories`.
Returns:
list: Station dicts.
"""
response = self._call(
mc_calls.BrowseStations,
station_category_id
)
stations = response.body.get('stations', [])
return stations
def browse_stations_categories(self):
"""Get the categories from Browse Stations.
Returns:
list: Station categories that can contain subcategories.
"""
response = self._call(
mc_calls.BrowseStationCategories
)
station_categories = response.body.get('root', {}).get('subcategories', [])
return station_categories
def config(self):
"""Get a listing of mobile client configuration settings."""
response = self._call(
mc_calls.Config
)
config_list = response.body.get('data', {}).get('entries', [])
return config_list
# TODO: Check success/failure?
def device_deauthorize(self, device):
"""Deauthorize a registered device.
Parameters:
device (dict): A device dict as returned by :meth:`devices`.
"""
self._call(
mc_calls.DeviceManagementInfoDelete,
device['id']
)
# TODO: Set device dict as property of MobileClient?
def device_set(self, device):
"""Set device used by :class:`MobileClient` instance.
Parameters:
device (dict): A device dict as returned by :meth:`devices`.
"""
if device['id'].startswith('0x'):
self.device_id = device['id'][2:]
elif device['id'].startswith('ios:'):
self.device_id = device['id'].replace(':', '')
else:
self.device_id = device['id']
def devices(self):
"""Get a listing of devices registered to the Google Music account."""
response = self._call(
mc_calls.DeviceManagementInfo
)
registered_devices = response.body.get('data', {}).get('items', [])
return registered_devices
def explore_genres(self, parent_genre_id=None):
"""Get a listing of song genres.
Parameters:
parent_genre_id (str, Optional): A genre ID.
If given, a listing of this genre's sub-genres is returned.
Returns:
list: Genre dicts.
"""
response = self._call(
mc_calls.ExploreGenres,
parent_genre_id
)
genre_list = response.body.get('genres', [])
return genre_list
def explore_tabs(self, *, num_items=100, genre_id=None):
"""Get a listing of explore tabs.
Parameters:
num_items (int, Optional): Number of items per tab to return.
Default: ``100``
genre_id (genre_id, Optional): Genre ID from :meth:`explore_genres` to explore.
Default: ``None``.
Returns:
dict: Explore tabs content.
"""
response = self._call(
mc_calls.ExploreTabs,
num_items=num_items,
genre_id=genre_id
)
tab_list = response.body.get('tabs', [])
explore_tabs = {}
for tab in tab_list:
explore_tabs[tab['tab_type'].lower()] = tab
return explore_tabs
def listen_now_dismissed_items(self):
"""Get a listing of items dismissed from Listen Now tab."""
response = self._call(
mc_calls.ListenNowGetDismissedItems
)
dismissed_items = response.body.get('items', [])
return dismissed_items
def listen_now_items(self):
"""Get a listing of Listen Now items.
Note:
This does not include situations;
use the :meth:`situations` method instead.
Returns:
dict: With ``albums`` and ``stations`` keys of listen now items.
"""
response = self._call(
mc_calls.ListenNowGetListenNowItems
)
listen_now_item_list = response.body.get('listennow_items', [])
listen_now_items = defaultdict(list)
for item in listen_now_item_list:
type_ = f"{ListenNowItemType(item['type']).name}s"
listen_now_items[type_].append(item)
return dict(listen_now_items)
def new_releases(self, genre_id=None):
new_releases_tab = self.explore_tabs(genre_id=genre_id)['new_releases']
new_releases = []
if 'groups' in new_releases_tab:
for group in new_releases_tab['groups']:
for entity in group['entities']:
del entity['kind']
new_releases.append(entity.popitem()[1])
return new_releases
def playlist_song(self, playlist_song_id):
"""Get information about a playlist song.
Note:
This returns the playlist entry information only.
For full song metadata, use :meth:`song` with
the ``'trackId'`` field.
Parameters:
playlist_song_id (str): A playlist song ID.
Returns:
dict: Playlist song information.
"""
playlist_song_info = next(
(
playlist_song
for playlist in self.playlists(include_songs=True)
for playlist_song in playlist['tracks']
if playlist_song['id'] == playlist_song_id
),
None
)
return playlist_song_info
def playlist_song_add(
self,
song,
playlist,
*,
after=None,
before=None,
index=None,
position=None
):
"""Add a song to a playlist.
Note:
* Provide no optional arguments to add to end.
* Provide playlist song dicts for ``after`` and/or ``before``.
* Provide a zero-based ``index`` (can be negative).
* Provide a one-based ``position``.
Parameters:
song (dict): A song dict.
playlist (dict): A playlist dict.
after (dict, Optional): A playlist song dict ``songs`` will follow.
before (dict, Optional): A playlist song dict ``songs`` will precede.
index (int, Optional): The zero-based index position to insert ``song``.
position (int, Optional): The one-based position to insert ``song``.
Returns:
dict: Playlist dict including songs.
"""
prev, next_ = get_ple_prev_next(
self.playlist_songs(playlist),
after=after,
before=before,
index=index,
position=position
)
if 'storeId' in song:
song_id = song['storeId']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['id']
mutation = mc_calls.PlaylistEntriesBatch.create(
song_id, playlist['id'],
preceding_entry_id=prev.get('id'),
following_entry_id=next_.get('id')
)
self._call(mc_calls.PlaylistEntriesBatch, mutation)
return self.playlist(playlist['id'], include_songs=True)
def playlist_songs_add(
self,
songs,
playlist,
*,
after=None,
before=None,
index=None,
position=None
):
"""Add songs to a playlist.
Note:
* Provide no optional arguments to add to end.
* Provide playlist song dicts for ``after`` and/or ``before``.
* Provide a zero-based ``index`` (can be negative).
* Provide a one-based ``position``.
Parameters:
songs (list): A list of song dicts.
playlist (dict): A playlist dict.
after (dict, Optional): A playlist song dict ``songs`` will follow.
before (dict, Optional): A playlist song dict ``songs`` will precede.
index (int, Optional): The zero-based index position to insert ``songs``.
position (int, Optional): The one-based position to insert ``songs``.
Returns:
dict: Playlist dict including songs.
"""
playlist_songs = self.playlist_songs(playlist)
prev, next_ = get_ple_prev_next(
playlist_songs,
after=after,
before=before,
index=index,
position=position
)
songs_len = len(songs)
for i, song in enumerate(songs):
if 'storeId' in song:
song_id = song['storeId']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['id']
mutation = mc_calls.PlaylistEntriesBatch.create(
song_id, playlist['id'],
preceding_entry_id=prev.get('id'),
following_entry_id=next_.get('id')
)
response = self._call(mc_calls.PlaylistEntriesBatch, mutation)
result = response.body['mutate_response'][0]
# TODO: Proper exception on failure.
if result['response_code'] != 'OK':
break
if i < songs_len - 1:
prev = self.playlist_song(result['id'])
_, next_ = get_ple_prev_next(
self.playlist_songs(playlist),
after=prev
)
return self.playlist(playlist['id'], include_songs=True)
def playlist_song_delete(self, playlist_song):
"""Delete song from playlist.
Parameters:
playlist_song (str): A playlist song dict.
Returns:
dict: Playlist dict including songs.
"""
self.playlist_songs_delete([playlist_song])
return self.playlist(playlist_song['playlistId'], include_songs=True)
def playlist_songs_delete(self, playlist_songs):
"""Delete songs from playlist.
Parameters:
playlist_songs (list): A list of playlist song dicts.
Returns:
dict: Playlist dict including songs.
"""
if not more_itertools.all_equal(
playlist_song['playlistId']
for playlist_song in playlist_songs
):
raise ValueError(
"All 'playlist_songs' must be from the same playlist."
)
mutations = [mc_calls.PlaylistEntriesBatch.delete(playlist_song['id']) for playlist_song in playlist_songs]
self._call(mc_calls.PlaylistEntriesBatch, mutations)
return self.playlist(playlist_songs[0]['playlistId'], include_songs=True)
def playlist_song_move(
self,
playlist_song,
*,
after=None,
before=None,
index=None,
position=None
):
"""Move a song in a playlist.
Note:
* Provide no optional arguments to move to end.
* Provide playlist song dicts for ``after`` and/or ``before``.
* Provide a zero-based ``index`` (can be negative).
* Provide a one-based ``position``.
Parameters:
playlist_song (dict): A playlist song dict.
after (dict, Optional): A playlist song dict ``songs`` will follow.
before (dict, Optional): A playlist song dict ``songs`` will precede.
index (int, Optional): The zero-based index position to insert ``song``.
position (int, Optional): The one-based position to insert ``song``.
Returns:
dict: Playlist dict including songs.
"""
playlist_songs = self.playlist(
playlist_song['playlistId'],
include_songs=True
)['tracks']
prev, next_ = get_ple_prev_next(
playlist_songs,
after=after,
before=before,
index=index,
position=position
)
mutation = mc_calls.PlaylistEntriesBatch.update(
playlist_song,
preceding_entry_id=prev.get('id'),
following_entry_id=next_.get('id')
)
self._call(mc_calls.PlaylistEntriesBatch, mutation)
return self.playlist(playlist_song['playlistId'], include_songs=True)
def playlist_songs_move(
self,
playlist_songs,
*,
after=None,
before=None,
index=None,
position=None
):
"""Move songs in a playlist.
Note:
* Provide no optional arguments to move to end.
* Provide playlist song dicts for ``after`` and/or ``before``.
* Provide a zero-based ``index`` (can be negative).
* Provide a one-based ``position``.
Parameters:
playlist_songs (list): A list of playlist song dicts.
after (dict, Optional): A playlist song dict ``songs`` will follow.
before (dict, Optional): A playlist song dict ``songs`` will precede.
index (int, Optional): The zero-based index position to insert ``songs``.
position (int, Optional): The one-based position to insert ``songs``.
Returns:
dict: Playlist dict including songs.
"""
if not more_itertools.all_equal(
playlist_song['playlistId']
for playlist_song in playlist_songs
):
raise ValueError(
"All 'playlist_songs' must be from the same playlist."
)
playlist = self.playlist(
playlist_songs[0]['playlistId'],
include_songs=True
)
prev, next_ = get_ple_prev_next(
playlist['tracks'],
after=after,
before=before,
index=index,
position=position
)
playlist_songs_len = len(playlist_songs)
for i, playlist_song in enumerate(playlist_songs):
mutation = mc_calls.PlaylistEntriesBatch.update(
playlist_song,
preceding_entry_id=prev.get('id'),
following_entry_id=next_.get('id')
)
response = self._call(mc_calls.PlaylistEntriesBatch, mutation)
result = response.body['mutate_response'][0]
# TODO: Proper exception on failure.
if result['response_code'] != 'OK':
break
if i < playlist_songs_len - 1:
prev = self.playlist_song(result['id'])
_, next_ = get_ple_prev_next(
self.playlist_songs(playlist),
after=prev
)
return self.playlist(playlist_songs[0]['playlistId'], include_songs=True)
def playlist_songs(self, playlist):
"""Get a listing of songs from a playlist.
Paramters:
playlist (dict): A playlist dict.
Returns:
list: Playlist song dicts.
"""
playlist_type = playlist.get('type')
playlist_song_list = []
if playlist_type in ('USER_GENERATED', None):
start_token = None
playlist_song_list = []
while True:
response = self._call(
mc_calls.PlaylistEntryFeed,
max_results=49995,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
if items:
playlist_song_list.extend(items)
start_token = response.body.get('nextPageToken')
if start_token is None:
break
elif playlist_type == 'SHARED':
playlist_share_token = playlist['shareToken']
start_token = None
playlist_song_list = []
while True:
response = self._call(
mc_calls.PlaylistEntriesShared,
playlist_share_token,
max_results=49995,
start_token=start_token
)
entry = response.body['entries'][0]
items = entry.get('playlistEntry', [])
if items:
playlist_song_list.extend(items)
start_token = entry.get('nextPageToken')
if start_token is None:
break
playlist_song_list.sort(key=itemgetter('absolutePosition'))
return playlist_song_list
def playlist(self, playlist_id, *, include_songs=False):
"""Get information about a playlist.
Parameters:
playlist_id (str): A playlist ID.
include_songs (bool, Optional): Include songs from
the playlist in the returned dict.
Default: ``False``
Returns:
dict: Playlist information.
"""
playlist_info = next(
(
playlist
for playlist in self.playlists(include_songs=include_songs)
if playlist['id'] == playlist_id
),
None
)
return playlist_info
def playlist_create(self, name, description='', *, make_public=False):
"""Create a playlist.
Parameters:
name (str): Name to give the playlist.
description (str): Description to give the playlist.
make_public (bool): If ``True`` and account has a subscription, make playlist public.
Default: ``False``
Returns:
dict: Playlist information.
"""
share_state = 'PUBLIC' if make_public else 'PRIVATE'
playlist = self._call(
mc_calls.PlaylistsCreate,
name,
description,
share_state
).body
return playlist
# TODO: Check success/failure?
def playlist_delete(self, playlist):
"""Delete a playlist.
Parameters:
playlist (dict): A playlist dict.
"""
self._call(
mc_calls.PlaylistsDelete,
playlist['id']
)
def playlist_edit(self, playlist, *, name=None, description=None, public=None):
"""Edit playlist(s).
Parameters:
playlist (dict): A playlist dict.
name (str): Name to give the playlist.
description (str): Description to give the playlist.
make_public (bool): If ``True`` and account has a subscription, make playlist public.
Default: ``False``
Returns:
dict: Playlist information.
"""
if all(
value is None
for value in (name, description, public)
):
raise ValueError(
'At least one of name, description, or public must be provided'
)
playlist_id = playlist['id']
playlist = self.playlist(playlist_id)
name = name if name is not None else playlist['name']
description = (
description if description is not None else playlist['description']
)
share_state = 'PUBLIC' if public else playlist['accessControlled']
playlist = self._call(
mc_calls.PlaylistsUpdate,
playlist_id,
name,
description,
share_state
).body
return playlist
def playlists(self, *, include_songs=False):
"""Get a listing of library playlists.
Parameters:
include_songs (bool, Optional): Include songs in the returned playlist dicts.
Default: ``False``.
Returns:
list: A list of playlist dicts.
"""
playlist_list = []
for chunk in self.playlists_iter(page_size=49995):
for playlist in chunk:
if include_songs:
playlist['tracks'] = self.playlist_songs(playlist)
playlist_list.append(playlist)
return playlist_list
def playlists_iter(self, *, start_token=None, page_size=250):
"""Get a paged iterator of library playlists.
Parameters:
start_token (str): The token of the page to return.
Default: Not sent to get first page.
page_size (int, Optional): The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Playlist dicts.
"""
start_token = None
while True:
response = self._call(
mc_calls.PlaylistFeed,
max_results=page_size,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
if items:
yield items
start_token = response.body.get('nextPageToken')
if start_token is None:
break
def podcast(self, podcast_series_id, *, max_episodes=50):
"""Get information about a podcast series.
Parameters:
podcast_series_id (str): A podcast series ID.
max_episodes (int, Optional): Include up to given number of episodes in returned dict.
Default: ``50``
Returns:
dict: Podcast series information.
"""
podcast_info = self._call(
mc_calls.PodcastFetchSeries,
podcast_series_id,
max_episodes=max_episodes
).body
return podcast_info
def podcasts(self, *, device_id=None):
"""Get a listing of subsribed podcast series.
Paramaters:
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
Returns:
list: Podcast series dict.
"""
if device_id is None:
device_id = self.device_id
podcast_list = []
for chunk in self.podcasts_iter(device_id=device_id, page_size=49995):
podcast_list.extend(chunk)
return podcast_list
def podcasts_iter(self, *, device_id=None, page_size=250):
"""Get a paged iterator of subscribed podcast series.
Parameters:
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
page_size (int, Optional): The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Podcast series dicts.
"""
if device_id is None:
device_id = self.device_id
start_token = None
prev_items = None
while True:
response = self._call(
mc_calls.PodcastSeries,
device_id,
max_results=page_size,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
# Google does some weird shit.
if items != prev_items:
subscribed_podcasts = [
item
for item in items
if item.get('userPreferences', {}).get('subscribed')
]
yield subscribed_podcasts
prev_items = items
else:
break
start_token = response.body.get('nextPageToken')
if start_token is None:
break
def podcast_episode(self, podcast_episode_id):
"""Get information about a podcast_episode.
Parameters:
podcast_episode_id (str): A podcast episode ID.
Returns:
dict: Podcast episode information.
"""
response = self._call(
mc_calls.PodcastFetchEpisode,
podcast_episode_id
)
podcast_episode_info = [
podcast_episode
for podcast_episode in response.body
if not podcast_episode['deleted']
]
return podcast_episode_info
def podcast_episodes(self, *, device_id=None):
"""Get a listing of podcast episodes for all subscribed podcasts.
Paramaters:
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
Returns:
list: Podcast episode dicts.
"""
if device_id is None:
device_id = self.device_id
podcast_episode_list = []
for chunk in self.podcast_episodes_iter(
device_id=device_id,
page_size=49995
):
podcast_episode_list.extend(chunk)
return podcast_episode_list
def podcast_episodes_iter(self, *, device_id=None, page_size=250):
"""Get a paged iterator of podcast episode for all subscribed podcasts.
Parameters:
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
page_size (int, Optional): The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Podcast episode dicts.
"""
if device_id is None:
device_id = self.device_id
start_token = None
prev_items = None
while True:
response = self._call(
mc_calls.PodcastEpisode,
device_id,
max_results=page_size,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
# Google does some weird shit.
if items != prev_items:
yield items
prev_items = items
else:
break
start_token = response.body.get('nextPageToken')
if start_token is None:
break
def search(self, query, *, max_results=100, **kwargs):
"""Search Google Music and library for content.
Parameters:
query (str): Search text.
max_results (int, Optional): Maximum number of results per type per
location to retrieve. I.e up to 100 Google and 100 library
for a total of 200 for the default value.
Google only accepts values up to 100.
Default: ``100``
kwargs (bool, Optional): Any of ``albums``, ``artists``, ``genres``,
``playlists``, ``podcasts``, ``situations``, ``songs``, ``stations``,
``videos`` set to ``True`` will include that result type in the
returned dict.
Setting none of them will include all result types in the returned dict.
Returns:
dict: A dict of results separated into keys: ``'albums'``, ``'artists'``,
``'genres'``, ``'playlists'``, ```'podcasts'``, ``'situations'``,
``'songs'``, ``'stations'``, ``'videos'``.
Note:
Free account search is restricted so may not contain hits for all result types.
"""
results = defaultdict(list)
for type_, results_ in self.search_library(
query,
max_results=max_results,
**kwargs
).items():
results[type_].extend(results_)
for type_, results_ in self.search_google(
query,
max_results=max_results,
**kwargs
).items():
results[type_].extend(results_)
return dict(results)
def search_google(self, query, *, max_results=100, **kwargs):
"""Search Google Music for content.
Parameters:
query (str): Search text.
max_results (int, Optional): Maximum number of results per type to retrieve.
Google only accepts values up to 100.
Default: ``100``
kwargs (bool, Optional): Any of ``albums``, ``artists``, ``genres``,
``playlists``, ``podcasts``, ``situations``, ``songs``, ``stations``,
``videos`` set to ``True`` will include that result type in the
returned dict.
Setting none of them will include all result types in the returned dict.
Returns:
dict: A dict of results separated into keys: ``'albums'``, ``'artists'``,
``'genres'``, ``'playlists'``, ```'podcasts'``, ``'situations'``,
``'songs'``, ``'stations'``, ``'videos'``.
Note:
Free account search is restricted so may not contain hits for all result types.
"""
response = self._call(
mc_calls.Query,
query,
max_results=max_results,
**kwargs
)
clusters = response.body.get('clusterDetail', [])
results = defaultdict(list)
for cluster in clusters:
result_type = QueryResultType(cluster['cluster']['type']).name
entries = cluster.get('entries', [])
if len(entries) > 0:
for entry in entries:
item_key = next(
key
for key in entry
if key not in ['cluster', 'score', 'type']
)
results[f"{result_type}s"].append(entry[item_key])
return dict(results)
def search_library(self, query, *, max_results=100, **kwargs):
"""Search Google Music for content.
Parameters:
query (str): Search text.
max_results (int, Optional): Maximum number of results per type to retrieve.
Default: ``100``
kwargs (bool, Optional): Any of ``playlists``, ``podcasts``,
``songs``, ``stations``, ``videos`` set to ``True``
will include that result type in the returned dict.
Setting none of them will include all result types in the returned dict.
Returns:
dict: A dict of results separated into keys:
``'playlists'``, ``'podcasts'``, ``'songs'``, ``'stations'``.
"""
def match_fields(item, fields):
return any(
query.casefold() in item.get(field, '').casefold()
for field in fields
)
types = [
(
'playlists',
['description', 'name'],
self.playlists
),
(
'podcasts',
['author', 'description', 'title'],
self.podcasts
),
(
'songs',
['album', 'albumArtist', 'artist', 'composer', 'genre', 'title'],
self.songs
),
(
'stations',
['byline', 'description', 'name'],
self.stations
),
]
results = {}
for type_, fields, func in types:
if (not kwargs) or (type_ in kwargs):
results[type_] = [
item
for item in func()
if match_fields(item, fields)
][:max_results]
return results
def search_suggestion(self, query):
"""Get search query suggestions for query.
Parameters:
query (str): Search text.
Returns:
list: Suggested query strings.
"""
response = self._call(
mc_calls.QuerySuggestion,
query
)
suggested_queries = response.body.get('suggested_queries', [])
return [
suggested_query['suggestion_string']
for suggested_query in suggested_queries
]
def shuffle_album(
self, album, *, num_songs=100, only_library=False, recently_played=None
):
"""Get a listing of album shuffle/mix songs.
Parameters:
album (dict): An album dict.
num_songs (int, Optional): The maximum number of songs to return from the station.
Default: ``100``
only_library (bool, Optional): Only return content from library.
Default: False
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'}
where ``id`` is a song ID and ``type`` is 0 for a library song and 1 for a store song.
Returns:
list: List of album shuffle/mix songs.
"""
station_info = {
'seed': {
'albumId': album['albumId'],
'seedType': StationSeedType.album.value
},
'num_entries': num_songs,
'library_content_only': only_library,
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
def shuffle_artist(
self,
artist,
*,
num_songs=100,
only_library=False,
recently_played=None,
only_artist=False
):
"""Get a listing of artist shuffle/mix songs.
Parameters:
artist (dict): An artist dict.
num_songs (int, Optional): The maximum number of songs to return from the station.
Default: ``100``
only_library (bool, Optional): Only return content from library.
Default: False
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'}
where ``id`` is a song ID and ``type`` is 0 for a library song and 1 for a store song.
only_artist (bool, Optional): If ``True``, only return songs from the artist,
else return songs from artist and related artists.
Default: ``False``
Returns:
list: List of artist shuffle/mix songs.
"""
station_info = {
'num_entries': num_songs,
'library_content_only': only_library
}
if only_artist:
station_info['seed'] = {
'artistId': artist['artistId'],
'seedType': StationSeedType.artist_only.value
}
else:
station_info['seed'] = {
'artistId': artist['artistId'],
'seedType': StationSeedType.artist_related.value
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
def shuffle_genre(
self, genre, *, num_songs=100, only_library=False, recently_played=None
):
"""Get a listing of genre shuffle/mix songs.
Parameters:
genre (dict): A genre dict.
num_songs (int, Optional): The maximum number of songs to return from the station. Default: ``100``
only_library (bool, Optional): Only return content from library.
Default: False
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'} where ``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
Returns:
list: List of genre shuffle/mix songs.
"""
station_info = {
'seed': {
'genreId': genre['id'],
'seedType': StationSeedType.genre.value,
},
'num_entries': num_songs,
'library_content_only': only_library
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
def shuffle_song(
self, song, *, num_songs=100, only_library=False, recently_played=None
):
"""Get a listing of song shuffle/mix songs.
Parameters:
song (dict): A song dict.
num_songs (int, Optional): The maximum number of songs to return from the station.
Default: ``100``
only_library (bool, Optional): Only return content from library.
Default: False
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'}
where ``id`` is a song ID and ``type`` is 0 for a library song and 1 for a store song.
Returns:
list: List of artist shuffle/mix songs.
"""
station_info = {
'num_entries': num_songs,
'library_content_only': only_library
}
if 'storeId' in song:
station_info['seed'] = {
'trackId': song['storeId'],
'seedType': StationSeedType.store_track.value
}
else:
station_info['seed'] = {
'trackLockerId': song['id'],
'seedType': StationSeedType.library_track.value
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(mc_calls.RadioStationFeed, station_infos=[station_info])
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
def situations(self, *, tz_offset=None):
"""Get a listing of situations.
Parameters:
tz_offset (int, Optional): A time zone offset from UTC in seconds.
"""
response = self._call(
mc_calls.ListenNowSituations,
tz_offset
)
situation_list = response.body.get('situations', [])
return situation_list
def song(self, song_id):
"""Get information about a song.
Parameters:
song_id (str): A song ID.
Returns:
dict: Song information.
"""
if song_id.startswith('T'):
song_info = self._call(
mc_calls.FetchTrack,
song_id
).body
else:
song_info = next(
(
song
for song in self.songs()
if song['id'] == song_id
),
None
)
return song_info
def song_add(self, song):
"""Add a store song to your library.
Parameters:
song (dict): A store song dict.
Returns:
str: Song's library ID.
"""
return self.songs_add([song])[0]
def songs_add(self, songs):
"""Add store songs to your library.
Parameters:
songs (list): A list of store song dicts.
Returns:
list: Songs' library IDs.
"""
mutations = [mc_calls.TrackBatch.add(song) for song in songs]
response = self._call(
mc_calls.TrackBatch,
mutations
)
success_ids = [
res['id']
for res in response.body['mutate_response']
if res['response_code'] == 'OK'
]
return success_ids
def song_delete(self, song):
"""Delete song from library.
Parameters:
song (str): A library song dict.
Returns:
str: Successfully deleted song ID.
"""
return self.songs_delete([song])[0]
def songs_delete(self, songs):
"""Delete songs from library.
Parameters:
song (list): A list of song dicts.
Returns:
list: Successfully deleted song IDs.
"""
mutations = [mc_calls.TrackBatch.delete(song['id']) for song in songs]
response = self._call(
mc_calls.TrackBatch,
mutations
)
success_ids = [
res['id']
for res in response.body['mutate_response']
if res['response_code'] == 'OK'
]
# TODO: Report failures.
# failure_ids = [
# res['id']
# for res in response.body['mutate_response']
# if res['response_code'] != 'OK'
# ]
return success_ids
def song_play(self, song):
"""Add play to song play count.
Parameters:
song (dict): A song dict.
Returns:
bool: ``True`` if successful, ``False`` if not.
"""
if 'storeId' in song:
song_id = song['storeId']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['id']
song_duration = song['durationMillis']
event = mc_calls.ActivityRecordRealtime.play(song_id, song_duration)
response = self._call(
mc_calls.ActivityRecordRealtime,
event
)
return True if response.body['eventResults'][0]['code'] == 'OK' else False
def song_rate(self, song, rating):
"""Rate song.
Parameters:
song (dict): A song dict.
rating (int): 0 (not rated), 1 (thumbs down), or 5 (thumbs up).
Returns:
bool: ``True`` if successful, ``False`` if not.
"""
if 'storeId' in song:
song_id = song['storeId']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['id']
event = mc_calls.ActivityRecordRealtime.rate(song_id, rating)
response = self._call(
mc_calls.ActivityRecordRealtime,
event
)
return True if response.body['eventResults'][0]['code'] == 'OK' else False
def songs(self):
"""Get a listing of library songs.
Returns:
list: Song dicts.
"""
song_list = []
for chunk in self.songs_iter(page_size=49995):
song_list.extend(chunk)
return song_list
def songs_iter(self, *, page_size=250):
"""Get a paged iterator of library songs.
Parameters:
page_size (int, Optional): The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Song dicts.
"""
start_token = None
while True:
response = self._call(
mc_calls.TrackFeed,
max_results=page_size,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
if items:
yield items
start_token = response.body.get('nextPageToken')
if start_token is None:
break
# TODO: Investigate library_content_only.
# TODO: Figure out 'radio/stationfeed'.
def station(self, station_id, *, num_songs=25, recently_played=None):
"""Get information about a station.
Parameters:
station_id (str): A station ID. Use 'IFL' for I'm Feeling Lucky.
num_songs (int, Optional): The maximum number of songs to return from the station.
Default: ``25``
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'}
where ``id`` is a song ID and ``type`` is 0 for a library song and 1 for a store song.
Returns:
dict: Station information.
"""
station_info = {
'station_id': station_id,
'num_entries': num_songs,
'library_content_only': False
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station
# TODO: Figure out 'radio/stationfeed'.
def station_feed(self, *, num_songs=25, num_stations=4):
"""Generate stations.
Note:
A Google Music subscription is required.
Parameters:
num_songs (int, Optional): The total number of songs to return. Default: ``25``
num_stations (int, Optional): The number of stations to return when no station_infos is provided.
Default: ``5``
Returns:
list: Station information dicts.
"""
response = self._call(
mc_calls.RadioStationFeed,
num_entries=num_songs,
num_stations=num_stations
)
station_feed = response.body.get('data', {}).get('stations', [])
return station_feed
def station_songs(self, station, *, num_songs=25, recently_played=None):
"""Get a listing of songs from a station.
Parameters:
station (str): A station dict.
num_songs (int, Optional): The maximum number of songs to return from the station. Default: ``25``
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'}
where ``id`` is a song ID and ``type`` is 0 for a library song and 1 for a store song.
Returns:
list: Station song dicts.
"""
station_id = station['id']
station = self.station(
station_id,
num_songs=num_songs,
recently_played=recently_played
)
return station.get('tracks', [])
def stations(self, *, generated=True, library=True):
"""Get a listing of library stations.
The listing can contain stations added to the library and generated from the library.
Parameters:
generated (bool, Optional): Include generated stations.
Default: True
library (bool, Optional): Include library stations.
Default: True
Returns:
list: Station information dicts.
"""
station_list = []
for chunk in self.stations_iter(page_size=49995):
for station in chunk:
if (
(generated and not station.get('inLibrary'))
or (library and station.get('inLibrary'))
):
station_list.append(station)
return station_list
def stations_iter(self, *, page_size=250):
"""Get a paged iterator of library stations.
Parameters:
page_size (int, Optional): The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Station dicts.
"""
start_token = None
while True:
response = self._call(
mc_calls.RadioStation,
max_results=page_size,
start_token=start_token
)
yield response.body.get('data', {}).get('items', [])
start_token = response.body.get('nextPageToken')
if start_token is None:
break
def stream(self, item, *, device_id=None, quality='hi', session_token=None):
"""Get MP3 stream of a podcast episode, library song, station_song, or store song.
Note:
Streaming requires a ``device_id`` from a valid, linked mobile device.
Parameters:
item (str): A podcast episode, library song, station_song, or store song.
A Google Music subscription is required to stream store songs.
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
quality (str, Optional): Stream quality is one of ``'hi'`` (320Kbps), ``'med'`` (160Kbps), or ``'low'`` (128Kbps).
Default: ``'hi'``.
session_token (str): Session token from a station dict required for unsubscribed users to stream a station song.
station['sessionToken'] as returend by :meth:`station` only exists for free accounts.
Returns:
bytes: An MP3 file.
"""
if device_id is None:
device_id = self.device_id
stream_url = self.stream_url(
item,
device_id=device_id,
quality=quality,
session_token=session_token
)
response = self.session.get(stream_url)
audio = response.content
return audio
def stream_url(self, item, *, device_id=None, quality='hi', session_token=None):
"""Get a URL to stream a podcast episode, library song, station_song, or store song.
Note:
Streaming requires a ``device_id`` from a valid, linked mobile device.
Parameters:
item (str): A podcast episode, library song, station_song, or store song.
A Google Music subscription is required to stream store songs.
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
quality (str, Optional): Stream quality is one of ``'hi'`` (320Kbps), ``'med'`` (160Kbps), or ``'low'`` (128Kbps).
Default: ``'hi'``.
session_token (str): Session token from a station dict required for unsubscribed users to stream a station song.
station['sessionToken'] as returend by :meth:`station` only exists for free accounts.
Returns:
str: A URL to an MP3 file.
"""
if device_id is None:
device_id = self.device_id
if 'episodeId' in item: # Podcast episode.
response = self._call(
mc_calls.PodcastEpisodeStreamURL,
item['episodeId'],
quality=quality,
device_id=device_id
)
elif 'wentryid' in item: # Free account station song.
response = self._call(
mc_calls.RadioStationTrackStreamURL,
item['storeId'],
item['wentryid'],
session_token,
quality=quality,
device_id=device_id
)
elif 'trackId' in item: # Playlist song.
response = self._call(
mc_calls.TrackStreamURL,
item['trackId'],
quality=quality,
device_id=device_id
)
elif 'storeId' in item and self.is_subscribed: # Store song.
response = self._call(
mc_calls.TrackStreamURL,
item['storeId'],
quality=quality,
device_id=device_id
)
elif 'id' in item: # Library song.
response = self._call(
mc_calls.TrackStreamURL,
item['id'],
quality=quality,
device_id=device_id
)
else:
# TODO: Create an exception for not being subscribed or use a better builtin exception for this case.
if 'storeId' in item and not self.is_subscribed:
msg = "Can't stream a store song without a subscription."
else:
msg = "Item does not contain an ID field."
raise ValueError(msg)
try:
stream_url = response.headers['Location']
except KeyError:
stream_url = response.body['url']
return stream_url
def thumbs_up_songs(self, *, library=True, store=True):
"""Get a listing of 'Thumbs Up' store songs.
Parameters:
library (bool, Optional): Include 'Thumbs Up' songs from library.
Default: True
generated (bool, Optional): Include 'Thumbs Up' songs from store.
Default: True
Returns:
list: Dicts of 'Thumbs Up' songs.
"""
thumbs_up_songs = []
if library is True:
thumbs_up_songs.extend(
song
for song in self.songs()
if song.get('rating', '0') == '5'
)
if store is True:
response = self._call(mc_calls.EphemeralTop)
thumbs_up_songs.extend(response.body.get('data', {}).get('items', []))
return thumbs_up_songs
def top_charts(self):
"""Get a listing of the default top charts."""
response = self._call(mc_calls.BrowseTopChart)
top_charts = response.body
return top_charts
def top_charts_for_genre(self, genre_id):
"""Get a listing of top charts for a top chart genre.
Parameters:
genre_id (str): A top chart genre ID as found with :meth:`top_charts_genres`.
"""
response = self._call(mc_calls.BrowseTopChartForGenre, genre_id)
top_chart_for_genre = response.body
return top_chart_for_genre
def top_charts_genres(self):
"""Get a listing of genres from the browse top charts tab."""
response = self._call(mc_calls.BrowseTopChartGenres)
top_chart_genres = response.body.get('genres', [])
return top_chart_genres
PK ! ,D+ D+ $ google_music/clients/musicmanager.py__all__ = ['MusicManager']
import socket
import subprocess
import time
from pathlib import Path
from urllib.parse import unquote
from uuid import getnode as get_mac
import audio_metadata
import google_music_proto.musicmanager.calls as mm_calls
from google_music_proto.musicmanager.pb import locker_pb2, upload_pb2
from google_music_proto.musicmanager.utils import transcode_to_mp3
from google_music_proto.oauth import (
MUSICMANAGER_CLIENT_ID,
MUSICMANAGER_CLIENT_SECRET,
MUSICMANAGER_SCOPE
)
from tenacity import stop_after_attempt
from .base import GoogleMusicClient
from ..utils import create_mac_string
class MusicManager(GoogleMusicClient):
"""API wrapper class to access Google Music Music Manager functionality.
>>> from google_music import MusicManager
>>> mm = MusicManager('username')
Parameters:
username (str, Optional): Your Google Music username.
This is used to store OAuth credentials for different accounts separately.
uploader_id (str, Optional): A unique uploader ID. Default: MAC address incremented by 1 is used.
token (dict, Optional): An OAuth token compatible with ``requests-oauthlib``.
"""
client = 'musicmanager'
client_id = MUSICMANAGER_CLIENT_ID
client_secret = MUSICMANAGER_CLIENT_SECRET
oauth_scope = MUSICMANAGER_SCOPE
def __init__(self, username=None, uploader_id=None, *, token=None):
username = username or ''
if self.login(username, token=token):
if uploader_id is None:
mac_int = get_mac()
if (mac_int >> 40) % 2:
raise OSError("A valid MAC address could not be obtained.")
mac_string = create_mac_string(mac_int)
if username:
uploader_id = f"{mac_string}-{username}"
else:
uploader_id = mac_string
uploader_name = f"{socket.gethostname()} ({self.session.headers['User-Agent']})"
self._upauth(uploader_id, uploader_name)
def __repr__(self):
return f"MusicManager(username={self.username!r}, uploader_id={self.uploader_id}, token={self.token})"
def _upauth(self, uploader_id, uploader_name):
self._call(mm_calls.UpAuth, uploader_id, uploader_name)
self._uploader_id = uploader_id
self._uploader_name = uploader_name
@property
def uploader_id(self):
"""The uploader ID of the :class:`MusicManager` instance."""
return self._uploader_id
@property
def uploader_name(self):
"""The uploader name of the :class:`MusicManager` instance."""
return self._uploader_name
def download(self, song):
"""Download a song from a Google Music library.
Parameters:
song (dict): A song dict.
Returns:
tuple: Song content as bytestring, suggested filename.
"""
song_id = song['id']
response = self._call(
mm_calls.Export,
self.uploader_id,
song_id)
audio = response.body
suggested_filename = unquote(
response.headers['Content-Disposition'].split("filename*=UTF-8''")[-1]
)
return (audio, suggested_filename)
def quota(self):
"""Get the uploaded track count and allowance.
Returns:
tuple: Number of uploaded tracks, number of tracks allowed.
"""
response = self._call(
mm_calls.ClientState,
self.uploader_id
)
client_state = response.body.clientstate_response
return (client_state.total_track_count, client_state.locker_track_limit)
def songs(self, *, uploaded=True, purchased=True):
"""Get a listing of Music Library songs.
Returns:
list: Song dicts.
"""
if not uploaded and not purchased:
raise ValueError("'uploaded' and 'purchased' cannot both be False.")
if purchased and uploaded:
song_list = []
for chunk in self.songs_iter(export_type=1):
song_list.extend(chunk)
elif purchased:
song_list = []
for chunk in self.songs_iter(export_type=2):
song_list.extend(chunk)
elif uploaded:
purchased_songs = []
for chunk in self.songs_iter(export_type=2):
purchased_songs.extend(chunk)
song_list = [
song
for chunk in self.songs_iter(export_type=1)
for song in chunk
if song not in purchased_songs
]
return song_list
def songs_iter(self, *, continuation_token=None, export_type=1):
"""Get a paged iterator of Music Library songs.
Parameters:
continuation_token (str, Optional): The token of the page to return.
Default: Not sent to get first page.
export_type (int, Optional): The type of tracks to return. 1 for all tracks, 2 for promotional and purchased.
Default: ``1``
Yields:
list: Song dicts.
"""
def track_info_to_dict(track_info):
return dict(
(field.name, value)
for field, value in track_info.ListFields()
)
while True:
response = self._call(
mm_calls.ExportIDs,
self.uploader_id,
continuation_token=continuation_token,
export_type=export_type
)
items = [
track_info_to_dict(track_info)
for track_info in response.body.download_track_info
]
if items:
yield items
continuation_token = response.body.continuation_token
if not continuation_token:
break
# TODO: Is there a better return value?
# TODO: Can more of this code be moved into calls and still leave viable control flow?
def upload(
self,
song,
*,
album_art_path=None,
no_sample=False
):
"""Upload a song to a Google Music library.
Parameters:
song (os.PathLike or str or audio_metadata.Format):
The path to an audio file or an instance of :class:`audio_metadata.Format`.
album_art_path (os.PathLike or str, Optional):
The relative filename or absolute filepath to external album art.
no_sample(bool, Optional):
Don't generate an audio sample from song;
send empty audio sample.
Default: Create an audio sample using ffmpeg/avconv.
Returns:
dict: A result dict with keys: ``'filepath'``, ``'success'``, ``'reason'``, and ``'song_id'`` (if successful).
"""
if not isinstance(song, audio_metadata.Format):
try:
song = audio_metadata.load(song)
except audio_metadata.UnsupportedFormat:
raise ValueError("'song' must be FLAC, MP3, or WAV.")
if album_art_path:
album_art_path = Path(album_art_path).resolve()
if album_art_path.is_file():
with album_art_path.open('rb') as image_file:
external_art = image_file.read()
else:
external_art = None
else:
external_art = None
result = {'filepath': Path(song.filepath)}
track_info = mm_calls.Metadata.get_track_info(song)
response = self._call(
mm_calls.Metadata,
self.uploader_id,
[track_info]
)
metadata_response = response.body.metadata_response
if metadata_response.signed_challenge_info: # Sample requested.
sample_request = metadata_response.signed_challenge_info[0]
try:
track_sample = mm_calls.Sample.generate_sample(
song,
track_info,
sample_request,
external_art=external_art,
no_sample=no_sample
)
response = self._call(
mm_calls.Sample,
self.uploader_id,
[track_sample]
)
track_sample_response = response.body.sample_response.track_sample_response[0]
except (OSError, ValueError, subprocess.CalledProcessError):
raise # TODO
else:
track_sample_response = metadata_response.track_sample_response[0]
response_code = track_sample_response.response_code
if response_code == upload_pb2.TrackSampleResponse.MATCHED:
result.update(
{
'success': True,
'reason': 'Matched',
'song_id': track_sample_response.server_track_id
}
)
elif response_code == upload_pb2.TrackSampleResponse.UPLOAD_REQUESTED:
server_track_id = track_sample_response.server_track_id
self._call(
mm_calls.UploadState,
self.uploader_id,
'START'
)
attempts = 0
should_retry = True
while should_retry and attempts <= 10:
# Call with tenacity.retry_with to disable automatic retries.
response = self._call.retry_with(stop=stop_after_attempt(1))(
self,
mm_calls.ScottyAgentPost,
self.uploader_id,
server_track_id,
track_info,
song,
external_art=external_art,
total_song_count=1,
total_uploaded_count=0
)
session_response = response.body
if 'sessionStatus' in session_response:
break
try:
# WHY, GOOGLE?! WHY???????????
status_code = session_response['errorMessage']['additionalInfo'][
'uploader_service.GoogleRupioAdditionalInfo'
]['completionInfo']['customerSpecificInfo'][
'ResponseCode'
]
except KeyError:
status_code = None
if status_code == 503: # Upload server still syncing.
should_retry = True
reason = "Server syncing"
elif status_code == 200: # Song is already uploaded.
should_retry = False
reason = "Already uploaded"
elif status_code == 404: # Rejected.
should_retry = False
reason = "Rejected"
else:
should_retry = True
reason = "Unkown error"
attempts += 1
time.sleep(2) # Give the server time to sync.
else:
result.update(
{
'success': False,
'reason': f'Could not get upload session: {reason}'
}
)
if 'success' not in result:
transfer = session_response['sessionStatus']['externalFieldTransfers'][0]
upload_url = transfer['putInfo']['url']
content_type = transfer.get('content_type', 'audio/mpeg')
original_content_type = track_info.original_content_type
transcode = (
isinstance(song, audio_metadata.WAV)
or original_content_type != locker_pb2.Track.MP3
)
if (
transcode
or original_content_type == locker_pb2.Track.MP3
):
if transcode:
audio_file = transcode_to_mp3(song, quality='320k')
else:
with open(song.filepath, 'rb') as f:
audio_file = f.read()
# Google Music allows a maximum file size of 300 MiB.
if len(audio_file) >= 300 * 1024 * 1024:
result.update(
{
'success': False,
'reason': 'Maximum allowed file size is 300 MiB.'
}
)
else:
upload_response = self._call(
mm_calls.ScottyAgentPut,
upload_url,
audio_file,
content_type=content_type
).body
if upload_response.get('sessionStatus', {}).get('state'):
result.update(
{
'success': True,
'reason': 'Uploaded',
'song_id': track_sample_response.server_track_id
}
)
else:
result.update(
{
'success': False,
'reason': upload_response # TODO: Better error details.
}
)
else:
# Do not upload files if transcode option set to False.
result.update(
{
'success': False,
'reason': 'Transcoding disabled for file type.'
}
)
self._call(mm_calls.UploadState, self.uploader_id, 'STOPPED')
else:
response_codes = upload_pb2._TRACKSAMPLERESPONSE.enum_types[0]
response_type = response_codes.values_by_number[
track_sample_response.response_code
].name
reason = response_type
result.update(
{
'success': False,
'reason': f'{reason}'
}
)
if response_type == 'ALREADY_EXISTS':
result['song_id'] = track_sample_response.server_track_id
return result
PK ! mj j google_music/session.py__all__ = ['GoogleMusicSession']
import json
from pathlib import Path
import appdirs
from requests_oauthlib import OAuth2Session
from . import __author__, __title__, __version__
TOKEN_DIR = Path(appdirs.user_data_dir(__title__, __author__))
def dump_token(token, username, client):
username = username or ''
token_path = TOKEN_DIR / username / f'{client}.token'
try:
token_path.parent.mkdir(parents=True)
except FileExistsError:
pass
with token_path.open('w') as f:
json.dump(token, f)
def load_token(username, client):
username = username or ''
token_path = TOKEN_DIR / username / f'{client}.token'
with token_path.open('r') as f:
token = json.load(f)
return token
class GoogleMusicSession(OAuth2Session):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.headers.update(
{'User-Agent': f'{__title__}/{__version__}'}
)
PK ! ZlB google_music/utils.py__all__ = ['create_mac_string', 'get_ple_prev_next']
def create_mac_string(mac_int, *, delimiter=':'):
mac = hex(mac_int)[2:].upper()
pad = max(12 - len(mac), 0)
mac += '0' * pad
return delimiter.join(
mac[x : x + 2]
for x in range(0, 12, 2)
)
def get_ple_prev_next(
playlist_songs,
*,
after=None,
before=None,
index=None,
position=None
):
if (
(after or before)
and position
):
raise ValueError(
"Must provide one or both of 'after'/'before' or one of 'index'/'position'."
)
if (
index is not None
and index not in range(-(len(playlist_songs)), len(playlist_songs) + 1)
):
raise ValueError(
f"'index' must be between {-len(playlist_songs)} and {len(playlist_songs)}."
)
if (
position is not None
and position not in range(1, len(playlist_songs) + 2)
):
raise ValueError(
f"'position' must be between 1 and {len(playlist_songs) + 1}."
)
prev = after or {}
next_ = before or {}
if prev or next_:
if prev:
index = playlist_songs.index(prev)
if not next_ and prev != playlist_songs[-1]:
index += 1
next_ = playlist_songs[index]
if next_:
index = playlist_songs.index(next_)
if not prev and next_ != playlist_songs[0]:
prev = playlist_songs[index]
else:
if position is not None:
if position == len(playlist_songs) + 1:
index = len(playlist_songs)
else:
index = position - 1
elif index is None or index == len(playlist_songs):
index = len(playlist_songs)
elif index < 0:
index = index % len(playlist_songs)
if index != 0:
prev = playlist_songs[index - 1]
if index != len(playlist_songs):
next_ = playlist_songs[index]
return prev, next_
PK ! ulQ Q $ google_music-3.0.1.dist-info/LICENSEThe MIT License (MIT)
Copyright (c) 2018-2019 thebigmunch
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
PK !HڽT U " google_music-3.0.1.dist-info/WHEEL
A
н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK !H' )
% google_music-3.0.1.dist-info/METADATAVkS8_mgyAȒ#$2kGmL+;СL,{=GW>E}yB2Hj4 9#0L2xz߲DL8"9)`LrD)81)
1\Tl;djc[ea%^+5f$FSc9=RA/L)zDUym`\ ePdgeۖcFTJ0R#g,zr)x(h$$cOJ_kSqĮ=5XX3dR3!ɇ?
]oZN&7_4ԱZ}j
]
iY?';EeEszzXX$~=S.ɅDwʍ\loHhKu֦լ^@YAA*irG(bZSGtΒ3}G؆zL:MGWѕД_SոG ).Fsb,U8C`%eNtGe|EbGqP_2$&(FU}3|i1n#7(8زC1.Dr(NgdǮmL9p+6V!7㮊Ŀ-V0IhRp.DD#b3(F<ʐe+ޓQ}ph!ØRVYf6+Ga#a(X'xqWʩ!OAJRͪ\v@xe&sЭBhAsIaoґ;kq&4/]R?AfEӍHGǁL-*4/"LWEW"9NJGBܦdv`nP燍fwqH¶ |z5:Oد͝1SJB4Cd@U&iXSRrqnhc