Przesyłanie strumieniowe¶
Stream
pozwala na filtering oraz sampling dla Tweetów w czasie rzeczywistym używając API Twittera.
Strumienie wykorzystują protokół Streaming HTTP do dostarczania danych poprzez otwarte, strumieniowe połączenie API. Zamiast dostarczać dane w partiach przez powtarzające się żądania aplikacji klienta, jak można oczekiwać od REST API, pojedyncze połączenie jest otwarte między aplikacją a API, a nowe wyniki są wysyłane przez to połączenie, gdy tylko wystąpią nowe dopasowania. Rezultatem tego jest mechanizm dostarczania z niskim opóźnieniem, który może obsługiwać bardzo dużą przepustowość. Aby uzyskać więcej informacji, zobacz https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data
Używanie Stream
¶
Aby użyć Stream
, jego instancja musi być zainicjowana z danymi uwierzytelniającymi API Twittera (Consumer Key, Consumer Secret, Access Token, Access Token Secret):
import tweepy
stream = tweepy.Stream(
"Consumer Key here", "Consumer Secret here",
"Access Token here", "Access Token Secret here"
)
Następnie, Stream.filter()
lub Stream.sample()
mogą być użyte do połączenia się i uruchomienia strumienia:
stream.filter(track=["Tweepy"])
Dane otrzymane ze strumienia są przekazywane do Stream.on_data()
. Metoda ta zajmuje się wysyłaniem danych do innych metod w oparciu o typ wiadomości. Na przykład, jeśli ze strumienia odbierany jest Tweet, surowe dane są wysyłane do Stream.on_data()
, która konstruuje obiekt Status
i przekazuje go do Stream.on_status()
. Domyślnie, pozostałe metody, poza Stream.on_data()
, które odbierają dane ze strumienia, po prostu logują otrzymane dane, z poziomem ogging level zależnym od typu danych.
Aby dostosować przetwarzanie danych strumienia, Stream
musi zostać podklasowane. Na przykład, aby zapisać ID każdego otrzymanego Tweeta:
class IDPrinter(tweepy.Stream):
def on_status(self, status):
print(status.id)
printer = IDPrinter(
"Consumer Key here", "Consumer Secret here",
"Access Token here", "Access Token Secret here"
)
printer.sample()
Wątkowanie¶
Zarówno Stream.filter()
jak i Stream.sample()
posiadają parametr threaded
. Kiedy ustawiony jest na True
, strumień będzie działał w oddzielnym thread
, który jest zwracany przez wywołanie którejkolwiek z metod. Na przykład::odpowiednia ilość czasu. Domyślnie, wszystkie trzy z tych metod rejestrują błąd. Aby dostosować obsługę, mogą one zostać nadpisane w podklasie:
thread = stream.filter(follow=[1072250532645998596], threaded=True)
Obsługa błędów¶
Stream
posiada wiele metod do obsługi błędów podczas strumieniowania. Stream.on_closed()
jest wywoływany, gdy strumień jest zamykany przez Twittera. Stream.on_connection_error()
jest wywoływany, gdy strumień napotka błąd połączenia. Stream.on_request_error()
jest wywoływany, gdy napotkany zostanie błąd podczas próby połączenia się ze strumieniem. Kiedy te błędy są napotkane i max_retries
, który domyślnie jest nieskończony, nie został jeszcze przekroczony, instancja Stream
będzie próbowała ponownie połączyć się ze strumieniem po odpowiednim czasie. Domyślnie, wszystkie trzy z tych metod rejestrują błąd. Aby dostosować obsługę, mogą one zostać nadpisane w podklasie:
class ConnectionTester(tweepy.Stream):
def on_connection_error(self):
self.disconnect()
Stream.on_request_error()
jest również przekazywany kod statusu HTTP, który został napotkany. Odniesienie do kodów statusu HTTP dla API Twittera można znaleźć na stronie https://developer.twitter.com/en/support/twitter-api/error-troubleshooting.
Stream.on_exception()
jest wywoływany, gdy wystąpi nieobsługiwany wyjątek. Jest to fatalne dla strumienia, a domyślnie wyjątek jest rejestrowany.