เริ่มต้นกับ Big Data กับ Python


หลังจากเป็นมือใหม่มาราวๆ 2 สัปดาห์ ก็พอจะมาบอกได้ว่ามันน่าจะเริ่มจากตรงไหนอะไรยังไงนะ เนื่องเพราะแฟนเรียน Big Data นั่นแหละ เราเลยต้องรู้ด้วย

เนื่องจากเคนใช้ iMac เพราะงั้นทุกโปรแกรมที่ใช้ก็จะอิงจาก Mac เป็นหลักนะ ใครใช้ Windows ก็อาจจะต้องไปลองหาๆ เพิ่มเติมเอานะ

จะพยายามพิมพ์เอาไว้เพื่อให้อ่านแล้วเริ่มต้นได้ง่ายที่สุดนะ และทุกอย่างที่อยู่ในบทความเป็นความเข้าใจจากที่เรียนรู้มา #รู้ไม่มากแต่อยากมาเล่า 555+ ซึ่งถ้าผิด ก็ไว้จะมาอัพเดทแก้นะ

ประเภทของข้อมูล Big Data

  • ปริมาณ (Volume)
  • ความเร็ว (Velocity)
  • ความหลากหลาย (Variety)
  • ความถูกต้อง (Veracity)
  • คุณค่า (Value)
  • ความแปรผันได้ (Variability)

สิ่งที่ต้องรู้จักเบื้องต้น

  • โปรแกรม Virtual Box / Parallels สำหรับสร้างเครื่องเสมือนบนเครื่องของเราเพื่อติดตั้ง Ubuntu ใช้ตัวไหนก็ได้ แต่ตัว Parallels ที่เป็นตัวเสียเงิน ดีกว่ามาก
  • OS Linux – Ubuntu เวอร์ชั่น LTS
  • Hadoop
  • Mysql / MongoDB
  • Python
  • Package ต่างๆ เช่น pandas, numpy, matplotlib, pyspark ฯลฯ
  • Sqoop
  • Hive

Hadoop

คือ โปรแกรมบริหารจัดการเครื่อง Server หลายๆ เครื่องให้มันทำงานร่วมกัน ทำให้เรามองเห็นเป็นเครื่องๆ เดียว ทำให้เราเก็บข้อมูลขนาดใหญ่ และกระจายไปเก็บในหลายๆ เครื่อง ทำให้เก็บข้อมูลได้ใหญ่ขึ้น และเข้าถึงได้เร็วขึ้น โดยตัวฮาดูปจะมีที่เก็บไฟล์ของตัวเองชื่อ HDFS (Hadoop Distributed File System)

เริ่มขั้นตอนติดตั้งซอฟท์แวร์

สำหรับขั้นตอนการติดตั้งเคนทำเป็นขั้นตอนไว้ใน Github เรียบร้อยแล้ว สามารถเข้าไปดูได้เลย https://github.com/kanexkane/big-data-101

PySpark กับ Pandas ต่างกันไหม

เท่าที่รู้ตอนนี้คือความสามารถเหมือนกันเลย ใช้อ่านและประมวลผลข้อมูลเหมือนกัน คำสั่งยังคล้ายๆ กันเลย อ่านๆ ไป ก็เจอแค่ว่า PySpark มันเร็วกว่า Pandas มาก แค่นั้นเอง

PySpark

โค้ดสำหรับเริ่มการใช้ pyspark ใน Google Colab

# ติดตั้ง pyspark ใน Google Colab
!pip install pyspark

# สร้าง Session ใส่ตัวแปร spark
spark = SparkSession.builder.appName("APP_NAME_MASTER").getOrCreate()

# แสดงผลออกมาดู
spark

ใช้อะไรบ้างก็ import เข้าไป

คืออันนี้มันจะเกินๆ หน่อยนะ แต่ถ้าอันไหนใครไม่ใช้ก็ตัดออกละกัน อันนี้เป็นอันที่เราใช้ทั้งหมดที่เคยเขียนโค้ดมา

from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import isnan, count, col, regexp_extract, when, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, QuantileDiscretizer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, NaiveBayes, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, ClusteringEvaluator
from pyspark.ml.regression import GeneralizedLinearRegression

อ่านไฟล์ และทำความสะอาด

# อ่านไฟล์ที่ต้องการ
df = spark.read.csv('E-Commerce Data.csv', inferSchema=True, header=True)

# แสดงโครงสร้างของไฟล์ที่อ่านได้ออกมาดู
df.printSchema()

# เช็คว่ามีคอลัมภ์ไหนมีค่าว่างบ้าง
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# เพิ่มคอลัมภ์ โดยใช้เงื่อนไขร่วมด้วย
df = df.withColumn("Wanted",
      when(
        (df["Quantity"] > 50)
      , 1).otherwise(0))

# เติมค่าช่องว่าง 'Column': 'Value'
df = df.na.fill({'Description': 'NO DESCRIPTION', 'CustomerID': 0 })

ทำคอลัมภ์ Features

# แปลงฟิลด์ที่ต้องการใช้เป็น Features จาก String เป็น Index โดยการวนลูป
indexers = [
  StringIndexer(inputCol=column, outputCol=column+"_index").fit(df)
  for column in ['StockCode', 'Country']
]

# ทำ One Hot Encoder สำหรับ Feature ที่ StringIndex มีค่ามากกว่า 2 ตัว
one_hot_encoders = OneHotEncoder(
    inputCols=[column+"_index" for column in ['StockCode', 'Country'] ],
    outputCols=[
        column+"_prep" for column in ['StockCode', 'Country']]
)

# ประกอบ Feature เป็นคอลัมภ์ จะได้เอาเข้า dataframe ได้
assembler = VectorAssembler(
    inputCols=['StockCode_prep', 'Country_prep', 'Quantity'],
    outputCol=feature_column
)

# ต่อท่อ
pipeline = Pipeline(stages=indexers + [one_hot_encoders, assembler])

# ทำงานตามท่อ และปรับรูปจาก DataFrame สร้างฟิลด์ features ขึ้นมา
feature_df = pipeline.fit(df).transform(df)

กำหนดคอลัมภ์ feature, label เพื่อง่ายต่อการเขียน

# เลือกคอลัมภ์ที่จะเอามาเป็นผลลัพธ์ที่ต้องการ
label_column = 'Wanted'
feature_column = 'features'
prediction_column = 'prediction'

แบ่งข้อมูลเพื่อฝึกและทดสอบ

# แยกข้อมูลเพื่อเทรน และเทส อัตราส่วน 7/3
(trainingData, testData) = feature_df.randomSplit([0.7, 0.3])

เทคนิค Logistic Regression

# เทคนิค Logistic regression
lr = LogisticRegression(labelCol=label_column, featuresCol=feature_column)
# เทรน
lr_model = lr.fit(trainingData)
# เทส prediction
lr_prediction = lr_model.transform(testData)
# ประเมินผลโมเดล Logistic Regression ว่าตรงกี่เปอร์เซ็นต์
lr_evaluator = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol=prediction_column)
# ประเมินผลโมเดล Logistic Regression ว่าตรงกี่เปอร์เซ็นต์
lr_evaluator = MulticlassClassificationEvaluator(labelCol=label_column, predictionCol=prediction_column)

lr_evaluator_f1 = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "f1"})

lr_evaluator_accuracy = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "accuracy"})

lr_evaluator_precision = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "precisionByLabel"})

lr_evaluator_recall = lr_evaluator.evaluate(lr_prediction, {lr_evaluator.metricName: "recallByLabel"})
print("Accuracy of Logistic Regression is = %g" % (lr_evaluator_accuracy*100), '%')

print("Precision of Logistic Regression = %g" %  (lr_evaluator_precision*100), '%')

print("F1 of Logistic Regression = %g" %  (lr_evaluator_f1*100), '%')

print("Recall of Logistic Regression = %g" %  (lr_evaluator_recall*100), '%')

print("Total rows of Logistic Regression prediction = %g" %  lr_prediction.count())

5 2 votes
Article Rating
0
Would love your thoughts, please comment.x
()
x