概述
流數據是一個在機器學習領域蓬勃發展的概念
學習如何使用PySpark來利用機器學習模型對流數據進行預測
我們將介紹流數據和Spark Streaming的基礎知識,然后深入到實現部分
引言
想象一下——每一秒都有8,500多條推文發布,900多張照片被上傳到Instagram,4,200多個Skype呼叫,78,000多次Google搜索,以及200多萬封電子郵件被發送(數據來自InternetLive Stats)。
我們正在以前所未有的速度和規模生產數據。這是在數據科學領域工作的大好時候!但是有了大量的數據后,接踵而至的是復雜的挑戰。
首要,如何收集大規模的數據?如何確保一旦生成并收集數據,機器學習管道就會繼續產生結果?這些都是業界面臨的重大挑戰,以及為什么流數據的概念在企業中越來越受到關注。
增加處理流數據的能力將極大地擴展當前的數據科學產品投資組合。這是業界急需的技能,若能熟練掌握它,將幫助你擔負起下一個數據科學角色。
因此,在本文中,我們將學習什么是流數據,了解Spark Streaming的基礎知識,然后在一個業界相關的數據集上使用Spark實現流數據。
什么是流數據?
社交媒體產生的數據是驚人的。你敢于想象存儲所有數據需要些什么嗎?這是一個復雜的過程!因此,在深入探討本文的Spark方面之前,先來理解什么是流數據。
流數據沒有離散的開始或結束。這些數據是每秒從數千個數據源中生成的,它們需要盡快進行處理和分析。大量流數據需要實時處理,例如Google搜索結果。
我們知道,在事件剛發生時一些見解會更有價值,而隨著時間的流逝它們會逐漸失去價值。以體育賽事為例——我們希望看到即時分析,即時統計見解,在那一刻真正享受比賽,對吧?
例如,假設你正在觀看一場羅杰·費德勒(Roger Federer)對戰諾瓦克·喬科維奇(Novak Djokovic)的激動人心的網球比賽。
這場比賽兩局打平,你想了解與費德勒的職業平均水平相比,其反手發球的百分比。是在幾天之后看到有意義,還是在決勝局開始前的那一刻看到有意義呢?
Spark Streaming的基礎知識
Spark Streaming是核心Spark API的擴展,可實現實時數據流的可伸縮和容錯流處理。
在轉到實現部分之前,先了解一下Spark Streaming的不同組成部分。
離散流
離散流(Dstream)是一個連續的數據流。對于離散流,其數據流可以直接從數據源接收,也可以在對原始數據進行一些處理后接收。
構建流應用程序的第一步是定義要從中收集數據的數據資源的批處理持續時間。如果批處理持續時間為2秒,則將每2秒收集一次數據并將其存儲在RDD中。這些RDD的連續序列鏈是一個DStream,它是不可變的,可以通過Spark用作一個分布式數據集。
考慮一個典型的數據科學項目。在數據預處理階段,我們需要轉換變量,包括將分類變量轉換為數字變量,創建分箱,去除異常值和很多其他的事。Spark保留了在數據上定義的所有轉換的歷史記錄。因此,無論何時發生故障,它都可以追溯轉換的路徑并重新生成計算結果。
我們希望Spark應用程序7 x 24小時持續運行。并且每當故障發生時,我們都希望它能盡快恢復。但是,在大規模處理數據的同時,Spark需要重新計算所有轉換以防出現故障。可以想象,這樣做的代價可能會非常昂貴。
緩存
這是應對該挑戰的一種方法。我們可以暫時存儲已計算(緩存)的結果,以維護在數據上定義的轉換的結果。這樣,當發生故障時,就不必一次又一次地重新計算這些轉換。
DStreams允許將流數據保留在內存中。當我們要對同一數據執行多種運算時,這很有用。
檢查點
高速緩存在正常使用時非常有用,但是它需要大量內存。并不是每個人都有數百臺具有128 GB內存的計算機來緩存所有內容。
檢查點的概念能夠有所幫助。
檢查點是另一種保留轉換后的數據框結果的技術。它將不時地將正在運行的應用程序的狀態保存在任何可靠的存儲介質(如HDFS)上。但是,它比緩存慢,靈活性也更差。
在擁有流數據時可以使用檢查點。轉換結果取決于先前的轉換結果,并且需要保存以供使用。此外,我們還存儲檢查點元數據信息,例如用于創建流數據的配置以及一系列DStream操作的結果等。
流數據的共享變量
有時候需要為必須在多個集群上執行的Spark應用程序定義諸如map,reduce或filter之類的函數。在函數中使用的變量會被復制到每臺機器(集群)中。
在這種情況下,每個集群都有一個不同的執行器,我們想要一些可以賦予這些變量之間關系的東西。
例如:假設Spark應用程序在100個不同的集群上運行,它們捕獲了來自不同國家的人發布的Instagram圖片。
現在,每個集群的執行者將計算該特定集群上的數據的結果。但是我們需要一些幫助這些集群進行交流的東西,以便獲得匯總結果。在Spark中,我們擁有共享變量,這些變量使此問題得以克服。
累加器變量
用例包括發生錯誤的次數,空白日志的數量,我們從特定國家收到請求的次數——所有這些都可以使用累加器解決。
每個集群上的執行程序將數據發送回驅動程序進程,以更新累加器變量的值。 累加器僅適用于關聯和可交換的運算。例如,對求和和求最大值有用,而求平均值不起作用。
廣播變量
當我們使用位置數據(例如城市名稱和郵政編碼的映射)時,這些是固定變量,是吧?現在,如果每次在任意集群上的特定轉換都需要這種類型的數據,我們不需要向驅動程序發送請求,因為它會太昂貴。
相反,可以在每個集群上存儲此數據的副本。這些類型的變量稱為廣播變量。
廣播變量允許程序員在每臺計算機上保留一個只讀變量。通常,Spark使用高效的廣播算法自動分配廣播變量,但是如果有任務需要多個階段的相同數據,也可以定義它們。
使用PySpark對流數據進行情感分析
是時候啟動你最喜歡的IDE了!讓我們在本節中進行編碼,并以實踐的方式理解流數據。
理解問題陳述
在本節我們將使用真實數據集。我們的目標是檢測推文中的仇恨言論。為了簡單起見,如果一條推文包含帶有種族主義或性別歧視情緒的言論,我們就認為該推文包含仇恨言論。
因此,任務是將種族主義或性別歧視的推文從其他推文中區分出來。我們將使用包含推文和標簽的訓練樣本,其中標簽“1”表示推文是種族主義/性別歧視的,標簽“0”則表示其他種類。
為什么這是一個與主題相關的項目?因為社交媒體平臺以評論和狀態更新的形式接收龐大的流數據。該項目將幫助我們審核公開發布的內容。
設置項目工作流程
1. 模型構建:構建邏輯回歸模型管道,對推文中是否包含仇恨言論進行分類。在這里,我們的重點不是建立一個完全準確的分類模型,而是了解如何在流數據上使用任意模型并返回結果
2. 初始化Spark Streaming的環境:一旦模型構建完成,需要定義獲取流數據的主機名和端口號
3. 流數據:接下來,從定義的端口添加來自netcat服務器的推文,SparkStreaming API將在指定的持續時間后接收數據
4. 預測并返回結果:一旦接收到推文,就將數據傳遞到創建的機器學習管道中,并從模型中返回預測的情緒
這是對工作流程的簡潔說明:
訓練數據以建立邏輯回歸模型
我們在一個CSV文件中存儲推文數據及其相應的標簽。使用邏輯回歸模型來預測推文是否包含仇恨言論。如果是,則模型預測標簽為1(否則為0)。你可以參考“面向初學者的PySpark”來設置Spark環境。
可以在這里下載數據集和代碼。
首先,需要定義CSV文件的模式。否則,Spark會將每列數據的類型都視為字符串。讀取數據并檢查模式是否符合定義:
# importing required libraries
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
# initializing spark session
sc = SparkContext(appName=“PySparkShell”)
spark = SparkSession(sc)
# define the schema
my_schema = tp.StructType([
tp.StructField(name=‘id’, dataType= tp.IntegerType(), nullable=True),
tp.StructField(name=‘label’, dataType= tp.IntegerType(), nullable=True),
tp.StructField(name=‘tweet’, dataType= tp.StringType(), nullable=True)
])
# read the dataset
my_data = spark.read.csv(‘twitter_sentiments.csv’,
schema=my_schema,
header=True)
# view the data
my_data.show(5)
# print the schema of the file
my_data.printSchema()
定義機器學習管道的各個階段
現在已經將數據保存在Spark數據框中,需要定義轉換數據的不同階段,然后使用它從模型中獲取預測的標簽。
在第一階段,使用RegexTokenizer將推特文本轉換為單詞列表。然后,從單詞列表中刪除停用詞并創建詞向量。在最后階段,使用這些詞向量來構建邏輯回歸模型并獲得預測的情緒。
記住——重點不是建立一個完全準確的分類模型,而是要看看如何在流數據上使用預測模型來獲取結果。
# define stage 1: tokenize the tweet text
stage_1 = RegexTokenizer(inputCol=‘tweet’ , outputCol=‘tokens’, pattern=‘\\W’)
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol=‘tokens’, outputCol=‘filtered_words’)
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol=‘filtered_words’, outputCol=‘vector’, vectorSize=100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol=‘vector’, labelCol=‘label’)
設置機器學習管道
讓我們在Pipeline對象中添加階段,然后按順序執行這些轉換。用訓練數據集擬合管道,現在,每當有了新的推文,只需要將其傳遞給管道對象并轉換數據即可獲取預測:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)
流數據和返回結果
假設每秒收到數百條評論,我們希望通過阻止用戶發布仇恨言論來保持平臺整潔。因此,每當我們收到新文本,都會將其傳遞到管道中并獲得預測的情緒。
我們將定義一個函數get_prediction,該函數將刪除空白句子并創建一個數據框,其中每一行都包含一條推文。
初始化Spark Streaming的環境并定義3秒的批處理持續時間。這意味著我們將對每3秒收到的數據進行預測:
# define a function to compute sentiments of the received tweets
defget_prediction(tweet_text):
try:
# filter the tweets whose length is greater than 0
tweet_text = tweet_text.filter(lambda x: len(x) 》0)
# create a dataframe with column name ‘tweet’ and each row will contain the tweet
rowRdd = tweet_text.map(lambda w: Row(tweet=w))
# create a spark dataframe
wordsDataFrame = spark.createDataFrame(rowRdd)
# transform the data using the pipeline and get the predicted sentiment
pipelineFit.transform(wordsDataFrame).select(‘tweet’,‘prediction’).show()
except :
print(‘No data’)
# initialize the streaming context
ssc = StreamingContext(sc, batchDuration=3)
# Create a DStream that will connect to hostname:port, like localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
# split the tweet text by a keyword ‘TWEET_APP’ so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split(‘TWEET_APP’))
# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
在一個終端上運行該程序,然后使用Netcat(用于將數據發送到定義的主機名和端口號的實用工具)。你可以使用以下命令啟動TCP連接:
nc -lk port_number
最后,在第二個終端中鍵入文本,你將在另一個終端中實時獲得預測。
完美!
結語
流數據在未來幾年只會越來越熱門,因此應該真正開始熟悉這一主題。請記住,數據科學不只是建立模型——整個流程都需要關注。
本文介紹了SparkStreaming的基礎知識以及如何在真實的數據集上實現它。我鼓勵大家使用另一個數據集或抓取實時數據來實現剛剛介紹的內容(你也可以嘗試其他模型)。
-
數據
+關注
關注
8文章
7134瀏覽量
89410 -
機器學習
+關注
關注
66文章
8438瀏覽量
132938
發布評論請先 登錄
相關推薦
評論