1
+ import functools
1
2
import json
2
3
import logging
3
- import socket
4
+ from datetime import datetime
4
5
5
6
import aioredis
6
- from aiotestspeed .aio import Speedtest
7
+ import requests
8
+ from aioredis .exceptions import ResponseError
7
9
from fastapi import FastAPI
8
10
from pydantic import BaseSettings
9
11
10
12
11
- SPEEDTEST_KEY = 'speedtest:{ip}'
13
+ TIMESERIES_KEY = 'is-bitcoin-lit:sentiment:mean:{time}'
14
+ SUMMARY_KEY = 'is-bitcoin-lit:summary:hourly:{time}'
15
+ SENTIMENT_API_URL = 'https://api.senticrypt.com/v1/history/bitcoin-{time}.json'
16
+ TIME_FORMAT_STRING = '%Y-%m-%d_%H'
12
17
13
18
14
19
class Config (BaseSettings ):
@@ -18,34 +23,45 @@ class Config(BaseSettings):
18
23
logger = logging .getLogger (__name__ )
19
24
config = Config ()
20
25
app = FastAPI (title = 'FastAPI Redis Tutorial' )
21
- redis = aioredis .from_url (config .redis_url )
26
+ redis = aioredis .from_url (config .redis_url , decode_responses = True )
22
27
23
28
24
- def get_cache_key ():
25
- hostname = socket .gethostname ()
26
- ip = socket .gethostbyname (hostname )
27
- return SPEEDTEST_KEY .format (ip = ip )
29
+ def make_summary (data ):
30
+ return {
31
+ 'time' : datetime .now ().timestamp (),
32
+ 'mean_sentiment' : sum (d ['mean' ] for d in data ) / len (data ),
33
+ }
28
34
29
35
30
- @app .get ('/speedtest' )
31
- async def speedtest ():
32
- logger .debug ('Running speedtest' )
33
- key = get_cache_key ()
36
+ @app .get ('/is-bitcoin-lit' )
37
+ async def bitcoin ():
38
+ sentiment_time = datetime .now ().strftime (TIME_FORMAT_STRING )
39
+ summary_key = SUMMARY_KEY .format (time = sentiment_time )
40
+ ts_key = TIMESERIES_KEY .format (time = sentiment_time )
41
+ url = SENTIMENT_API_URL .format (time = sentiment_time )
34
42
35
- found = await redis .get (key )
36
- if found :
37
- data = json .loads (found )
38
- else :
39
- s : Speedtest = await Speedtest ()
40
- await s .get_best_server ()
41
- await s .download ()
42
- await s .upload ()
43
+ summary = await redis .hgetall (summary_key )
43
44
44
- data = {
45
- 'ping_ms' : s .results .ping ,
46
- 'download_mbps' : s .results .download / 1000.0 / 1000.0 / 1 ,
47
- 'upload_mbps' : s .results .upload / 1000.0 / 1000.0 / 1 ,
48
- }
49
- await redis .set (key , json .dumps (data ), ex = 30 )
45
+ if not summary :
46
+ # TODO: Only add timeseries data that we don't already have -- how?
47
+ data = requests .get (url ).json ()
48
+ summary = make_summary (data )
49
+ await redis .hset (summary_key , mapping = summary )
50
+ await redis .expire (summary_key , 60 )
51
+ partial = functools .partial (redis .execute_command , 'TS.MADD' , ts_key )
52
+ for datapoint in data :
53
+ partial = functools .partial (
54
+ partial , datapoint ['timestamp' ], datapoint ['mean' ],
55
+ )
56
+ await partial ()
50
57
51
- return data
58
+ return summary
59
+
60
+
61
+ @app .on_event ('startup' )
62
+ async def startup_event ():
63
+ try :
64
+ redis .execute_command ('TS.CREATE' , TIMESERIES_KEY )
65
+ except ResponseError :
66
+ # Time series already exists
67
+ pass
0 commit comments