fixing fitbit

This commit is contained in:
2023-11-28 20:10:38 +00:00
parent 664b6d4a5b
commit ccedbd7381
9 changed files with 27 additions and 342 deletions

View File

@@ -1 +0,0 @@
[![Build Status](http://droneci.service.dc1.consul/api/badges/sstent/fitbit-collect/status.svg)](http://droneci.service.dc1.consul/sstent/fitbit-collect)

View File

@@ -1 +1 @@
{"client_id": "22BQMP", "client_secret": "280a9e3702af04f687a84862c3f6f6ac"} {"client_id": "22BQM9", "client_secret": "14de737c439532c8fab377940425f97a"}

View File

@@ -1 +1 @@
{"installed":{"client_id":"182877671696-qj1oq6pi50s6v7nk16m59ulmg28klo0r.apps.googleusercontent.com","project_id":"quickstart-1588344492360","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"oSI3LMMY9caNiGgH0NKSO3oS","redirect_uris":["urn:ietf:wg:oauth:2.0:oob","http://localhost"]}} {"installed":{"client_id":"807888907647-3heepla8temofm6gdt14cm8knjgqhmb6.apps.googleusercontent.com","project_id":"abodyinmotion-basis-530","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://oauth2.googleapis.com/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_secret":"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","redirect_uris":["http://localhost"]}}

View File

@@ -1,158 +0,0 @@
#!/usr/bin/env python
"""
Note: This file was adapted from the unoffiicial Python Fitbit client Git repo:
https://raw.githubusercontent.com/orcasgit/python-fitbit/master/gather_keys_oauth2.py
"""
import cherrypy
import os
import sys
import threading
import traceback
import webbrowser
import json
from urllib.parse import urlparse
# import urllib.parse as urlparse
from base64 import b64encode
from fitbit.api import Fitbit, FitbitOauth2Client
from oauthlib.oauth2.rfc6749.errors import MismatchingStateError, MissingTokenError
CLIENT_DETAILS_FILE = 'client_details.json' # configuration for for the client
USER_DETAILS_FILE = 'user_details.json' # user details file
class OAuth2Server:
def __init__(self, client_id, client_secret,
redirect_uri='http://127.0.0.1:8080/'):
""" Initialize the FitbitOauth2Client """
self.success_html = """
<h1>You are now authorized to access the Fitbit API!</h1>
<br/><h3>You can close this window</h3>"""
self.failure_html = """
<h1>ERROR: %s</h1><br/><h3>You can close this window</h3>%s"""
self.fitbit = Fitbit(
client_id,
client_secret,
redirect_uri=redirect_uri,
timeout=10,
)
self.redirect_uri = redirect_uri
self.oauth = FitbitOauth2Client(client_id, client_secret)
def headless_authorize(self):
"""
Authorize without a display using only TTY.
"""
url, _ = self.oauth.authorize_token_url(redirect_uri=self.redirect_uri)
# Ask the user to open this url on a system with browser
print('\n-------------------------------------------------------------------------')
print('\t\tOpen the below URL in your browser\n')
print(url)
print('\n-------------------------------------------------------------------------\n')
print('NOTE: After authenticating on Fitbit website, you will redirected to a URL which ')
print('throws an ERROR. This is expected! Just copy the full redirected here.\n')
redirected_url = input('Full redirected URL: ')
params = urlparse.parse_qs(urlparse.urlparse(redirected_url).query)
print(params['code'][0])
self.authenticate_code(code=params['code'][0])
def browser_authorize(self):
"""
Open a browser to the authorization url and spool up a CherryPy
server to accept the response
"""
url, _ = self.fitbit.client.authorize_token_url()
# Open the web browser in a new thread for command-line browser support
threading.Timer(1, webbrowser.open, args=(url,)).start()
print()
print('URL for authenticating is:')
print(url)
print()
# Same with redirect_uri hostname and port.
urlparams = urlparse(self.redirect_uri)
cherrypy.config.update({'server.socket_host': '0.0.0.0',
'server.socket_port': urlparams.port})
cherrypy.quickstart(self)
def authenticate_code(self, code=None):
"""
Final stage of authentication using the code from Fitbit.
"""
try:
self.oauth.fetch_access_token(code, self.redirect_uri)
except MissingTokenError:
error = self._fmt_failure(
'Missing access token parameter.</br>Please check that '
'you are using the correct client_secret'
)
except MismatchingStateError:
error = self._fmt_failure('CSRF Warning! Mismatching state')
@cherrypy.expose
def index(self, state, code=None, error=None):
"""
Receive a Fitbit response containing a verification code. Use the code
to fetch the access_token.
"""
error = None
if code:
try:
self.fitbit.client.fetch_access_token(code)
except MissingTokenError:
error = self._fmt_failure(
'Missing access token parameter.</br>Please check that '
'you are using the correct client_secret')
except MismatchingStateError:
error = self._fmt_failure('CSRF Warning! Mismatching state')
else:
error = self._fmt_failure('Unknown error while authenticating')
# Use a thread to shutdown cherrypy so we can return HTML first
self._shutdown_cherrypy()
return error if error else self.success_html
def _fmt_failure(self, message):
tb = traceback.format_tb(sys.exc_info()[2])
tb_html = '<pre>%s</pre>' % ('\n'.join(tb)) if tb else ''
return self.failure_html % (message, tb_html)
def _shutdown_cherrypy(self):
""" Shutdown cherrypy in one second, if it's running """
if cherrypy.engine.state == cherrypy.engine.states.STARTED:
threading.Timer(1, cherrypy.engine.exit).start()
if __name__ == '__main__':
if not (len(sys.argv) == 3):
print("Arguments: client_id and client_secret")
sys.exit(1)
client_id = sys.argv[1]
client_secret = sys.argv[2]
server = OAuth2Server(client_id, client_secret)
# server.headless_authorize()
server.browser_authorize()
profile = server.fitbit.user_profile_get()
print('You are authorized to access data for the user: {}'.format(
profile['user']['fullName']))
print('TOKEN\n=====\n')
for key, value in server.fitbit.client.session.token.items():
print('{} = {}'.format(key, value))
print("Writing client details to file for usage on next collection.")
client_details = {'client_id': client_id, 'client_secret': client_secret} # Details of application
with open(CLIENT_DETAILS_FILE, 'w') as f:
json.dump(client_details, f)
print("Writing user details to file for usage on next collection.")
with open(USER_DETAILS_FILE, 'w') as f:
json.dump(server.fitbit.client.session.token, f)

View File

@@ -1,71 +0,0 @@
from __future__ import print_function
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
# The ID and range of a sample spreadsheet.
SAMPLE_SPREADSHEET_ID = '1YkMf_3m2YroHhtyS2FrdLzm3HnJDk4-q8r4cSagYrrg'
SAMPLE_RANGE_NAME = 'fitbit_export!A2:E'
def UpdateSheet():
"""Shows basic usage of the Sheets API.
Prints values from a sample spreadsheet.
"""
creds = None
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
service = build('sheets', 'v4', credentials=creds)
# Call the Sheets API
sheet = service.spreadsheets()
result = sheet.values().get(spreadsheetId=SAMPLE_SPREADSHEET_ID,
range=SAMPLE_RANGE_NAME).execute()
values = result.get('values', [])
if not values:
print('No data found.')
else:
print('Name, Major:')
for row in values:
# Print columns A and E, which correspond to indices 0 and 4.
print('%s, %s' % (row[0], row[4]))
values = [
[
"TEST", "TEST"
],
]
body = {
'values': values
}
result = service.spreadsheets().values().append(
spreadsheetId=SAMPLE_SPREADSHEET_ID,range=SAMPLE_RANGE_NAME,
valueInputOption='RAW', body=body).execute()
print('{0} cells appended.'.format(result \
.get('updates') \
.get('updatedCells')))
if __name__ == '__main__':
main()

View File

@@ -1,47 +0,0 @@
import consul
import json
USER_DETAILS_FILE = 'user_details.json' # user details file
c = consul.Consul(host='192.168.1.237')
# def _get_user_details():
# """
# The specific user that you want to retrieve data for.
# """
# with open(USER_DETAILS_FILE) as f:
# fitbit_user = json.load(f)
# access_token = fitbit_user['access_token']
# refresh_token = fitbit_user['refresh_token']
# expires_at = fitbit_user['expires_at']
# return access_token, refresh_token, expires_at
def _set_user_details(access_token, refresh_token, expires_at):
c.kv.put('access_token', access_token)
c.kv.put('refresh_token', refresh_token)
c.kv.put('expires_at', str(expires_at))
def _get_user_details():
access_token = c.kv.get('access_token')[1]["Value"].decode("utf-8")
refresh_token = c.kv.get('refresh_token')[1]["Value"].decode("utf-8")
expires_at = c.kv.get('expires_at')[1]["Value"].decode("utf-8")
return access_token, refresh_token, expires_at
# access_token, refresh_token, expires_at = _get_user_details()
# store_user_details(access_token, refresh_token, expires_at)
access_token, refresh_token, expires_at = _get_user_details()
print(access_token)
print(expires_at)
print(refresh_token)
# print(access_token[1]["Value"].decode("utf-8"))
# print(refresh_token[1]["Value"].decode("utf-8"))
# print(expires_at[1]["Value"].decode("utf-8"))

View File

@@ -1,58 +0,0 @@
import json
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
def plot_steps(data):
steps = data['activities-log-steps']
x = [datetime.strptime(d['dateTime'], '%Y-%M-%d').strftime("%A") for d in steps]
y = [float(d['value']) for d in steps]
plt.bar(x,y)
plt.title('Steps last 7 days')
plt.show()
def plot_sleep(data):
sleep = data['sleep']
x = [datetime.strptime(d['dateOfSleep'], '%Y-%M-%d').strftime("%A") for d in sleep][::-1]
deep = [float(d['levels']['summary']['deep']['minutes'])/60.0 for d in sleep][::-1]
light = [float(d['levels']['summary']['light']['minutes'])/60.0 for d in sleep][::-1]
rem = [float(d['levels']['summary']['rem']['minutes'])/60.0 for d in sleep][::-1]
awake = [float(d['levels']['summary']['wake']['minutes'])/60.0 for d in sleep][::-1]
barWidth = 0.15
r1 = np.arange(len(deep))
r2 = [x + barWidth for x in r1]
r3 = [x + barWidth for x in r2]
r4 = [x + barWidth for x in r3]
# Make the plot
plt.bar(r1, awake, color='#ffa600', width=barWidth, edgecolor='white', label='Awake')
plt.bar(r2, rem, color='#ff6361', width=barWidth, edgecolor='white', label='REM')
plt.bar(r3, light, color='#bc5090', width=barWidth, edgecolor='white', label='Light')
plt.bar(r4, deep, color='#003f5c', width=barWidth, edgecolor='white', label='Deep')
# Add xticks on the middle of the group bars
plt.xlabel('Day', fontweight='bold')
plt.ylabel('Hours', fontweight='bold')
plt.xticks([r + barWidth for r in range(len(deep))], x)
# Create legend & Show graphic
plt.legend()
plt.show()
if __name__ == '__main__':
with open('fitbit_data.json') as f:
data = json.load(f)
#plot_steps(data)
plot_sleep(data)

30
fitbit-collect/run_collect.py Normal file → Executable file
View File

@@ -1,3 +1,6 @@
#! /usr/bin/env nix-shell
#! nix-shell -i python3 -p python310Packages.google-api-python-client -p python310Packages.fitbit -p python310Packages.consul -p python310Packages.google-auth-oauthlib
""" """
Script to retrieve Fitbit data for the given user Script to retrieve Fitbit data for the given user
""" """
@@ -12,6 +15,7 @@ from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request from google.auth.transport.requests import Request
from datetime import date, timedelta from datetime import date, timedelta
import time
import consul import consul
@@ -33,7 +37,7 @@ SAMPLE_RANGE_NAME = 'fitbit_export!A2:E'
FITBIT_API = 'https://api.fitbit.com' FITBIT_API = 'https://api.fitbit.com'
CLIENT_DETAILS_FILE = 'client_details.json' # configuration for for the client CLIENT_DETAILS_FILE = 'client_details.json' # configuration for for the client
USER_DETAILS_FILE = 'user_details.json' # user details file #USER_DETAILS_FILE = 'user_details.json' # user details file
RESULT_FILE = 'fitbit_data.json' # The place where we will place the results RESULT_FILE = 'fitbit_data.json' # The place where we will place the results
@@ -84,7 +88,9 @@ def _get_api_call():
# profile_info = '/1/user/-/profile.json' # profile_info = '/1/user/-/profile.json'
# # Sleep between two dates # # Sleep between two dates
# sleep_dates = '/1.2/user/-/sleep/date/2020-04-13/2020-04-17.json' # sleep_dates = '/1.2/user/-/sleep/date/2020-04-13/2020-04-17.json'
weight = '/1.2/user/-/body/log/weight/date/' + str(date_start) + '/' + str(date_end) + '.json' weight = '/1/user/-/body/log/weight/date/' + str(date_start) + '/' + str(date_end) + '.json'
print("API:", weight)
return weight return weight
@@ -196,10 +202,24 @@ def run():
if __name__ == '__main__': if __name__ == '__main__':
date_start = date.today() - timedelta(10)
date_start = date.today() - timedelta(10)
date_end = date.today() date_end = date.today()
print("Grabbin': ", date_start, date_end) # print("Grabbin': ", date_start, date_end)
run() run()
# ###Bulk import
# date_start = date(2022, 8, 1)
# end_date = date(2023, 11, 1)
# print(date_start, end_date)
# #set first block of 30days
# date_end = date_start + timedelta(days=30)
# while date_end < end_date:
# print(date_start, date_end)
# run()
# time.sleep(10)
# date_start = date_start + timedelta(days=30)
# date_end = date_end + timedelta(days=30)

Binary file not shown.