Tutorial Spark#
Para realizar este tutorial abrimos un cuaderno de Jupyter https://jupyter.org/ en nuestro servidor local o utilizamos Colaboratory: https://colab.research.google.com.
Los pasos son los siguientes:
Actualizamos nuestro sistema operativo e instalamos Java y Scala
apt update
Cell In[1], line 1
apt update
^
SyntaxError: invalid syntax
apt upgrade --fix-broken
apt install default-jre
apt autoremove
apt install default-jdk
apt install scala
java -version
openjdk version "11.0.13" 2021-10-19
OpenJDK Runtime Environment (build 11.0.13+8-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.13+8-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)
javac -version
javac 11.0.13
scala -version
readlink -f $(which java)
/usr/lib/jvm/java-11-openjdk-amd64/bin/java
Descargamos e instalamos Spark
wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.asc
--2022-02-23 15:19:24-- https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.asc
Resolviendo downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Conectando con downloads.apache.org (downloads.apache.org)[135.181.214.104]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 833 [text/plain]
Guardando como: “spark-3.2.1-bin-hadoop3.2.tgz.asc.1”
spark-3.2.1-bin-had 100%[===================>] 833 --.-KB/s en 0s
2022-02-23 15:19:25 (119 MB/s) - “spark-3.2.1-bin-hadoop3.2.tgz.asc.1” guardado [833/833]
wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.sha512
wget https://downloads.apache.org/spark/KEYS
--2022-02-23 15:19:26-- https://downloads.apache.org/spark/KEYS
Resolviendo downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Conectando con downloads.apache.org (downloads.apache.org)[135.181.214.104]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 101227 (99K)
Guardando como: “KEYS.1”
KEYS.1 100%[===================>] 98,85K 121KB/s en 0,8s
2022-02-23 15:19:28 (121 KB/s) - “KEYS.1” guardado [101227/101227]
gpg --import KEYS
gpg --verify spark-3.2.1-bin-hadoop3.2.tgz.asc spark-3.2.1-bin-hadoop3.2.tgz
gpg: Firmado el jue 20 ene 2022 21:10:50 CET
gpg: usando RSA clave CEA888BDB32D983C7F094564AC01E6E9139F610C
gpg: Firma correcta de "Huaxin Gao (CODE SIGNING KEY) <huaxin.gao11@gmail.com>" [desconocido]
gpg: ATENCIÓN: ¡Esta clave no está certificada por una firma de confianza!
gpg: No hay indicios de que la firma pertenezca al propietario.
Huellas dactilares de la clave primaria: CEA8 88BD B32D 983C 7F09 4564 AC01 E6E9 139F 610C
ls
tar xvf spark-3.2.1-bin-hadoop3.2.tgz
Instalamos e configuramos la biblioteca de Python findspark
pip install -q findspark
pwd
/home/usuario/gitRepos/spark/sparkFiles
ls
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"
Utilizamos la biblioteca findspark para crear un dataframe
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([{"Hola": "mundo"} for x in range(1000)])
df.show(3, False)
Extraer, transformar y selecionar#
Importamos la biblioteca pyspark
import pyspark
print(pyspark.__version__)
3.2.1
Transformar#
Transformar significa escalar, convertir o modificar las características de los datos.
Uno de los primeros pasos en NLP (Natural Language Processing) es convertir el texto en tokens o palabras tokenizadas.
from pyspark.ml.feature import Tokenizer
oraciones_df = spark.createDataFrame([
(1, "Introducción a sparkMlib"),
(2, "Mlib incluye bibliotecas para clasificación y regresión"),
(3, "También incluye soporte a datapipe lines"),
], ["id", "oraciones"])
oraciones_df.show()
Para reflejar la importancia de una palabra en un texto utilizamos Term frequency-inverse document frequency (TF-IDF).
sent_token = Tokenizer(inputCol = "oraciones", outputCol = "palabras")
sent_tokenized_df = sent_token.transform(oraciones_df)
sent_tokenized_df.take(10)
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol = "palabras", outputCol = "rawfeatures", numFeatures = 20)
sent_fhTF_df = hashingTF.transform(sent_tokenized_df)
sent_fhTF_df.take(1)
idf = IDF(inputCol = "rawfeatures", outputCol = "idffeatures")
idfModel = idf.fit(sent_fhTF_df)
tfidf_df = idfModel.transform(sent_fhTF_df)
tfidf_df.take(1)
Utilizamos la clase StandardScaler para estandarizar datos en ML.
Los escala entre -1 y 1.
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
(1, Vectors.dense([10.0,10000.0,1.0]),),
(2, Vectors.dense([20.0,40000.0,2.0]),),
(3, Vectors.dense([30.0,50000.0,3.0]),),
],["id", "features"] )
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)
stand_sfeatures_df.show()
Fuente: MA Raza, Ph.D. https://towardsdatascience.com/machine-learning-with-spark-f1dbc1363986.
Spark docs: https://spark.apache.org/docs/3.0.1/ml-features.html#standardscaler.
Utilizamos la clase MinMaxScaler en ML para normalizar datos numéricos.
Escala los datos entre 0 y 1.
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
(1, Vectors.dense([10.0,10000.0,1.0]),),
(2, Vectors.dense([20.0,40000.0,2.0]),),
(3, Vectors.dense([30.0,50000.0,3.0]),),
],["id", "features"] )
features_df.show()
Aplicamos la transformación de la biblioteca MinMaxScaler.
features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)
sfeatures_df.show()
Fuente: https://spark.apache.org/docs/3.0.1/ml-features.html#minmaxscaler
El ejemplo completo se encuentra en examples/src/main/python/ml/min_max_scaler_example.py dentro del repositorio de Spark.
La clase Bucketizer transforma los datos en varias frecuencias o buckets.
from pyspark.ml.feature import Bucketizer
from pyspark.ml.linalg import Vectors
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()
Fuente: https://spark.apache.org/docs/3.0.1/ml-features.html#bucketizer
Clustering#
Para agrupar datos en un razonable grupo de frecuencias se puede utilizar como técnica el llamado clustering.
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import glob
# Carga los datos
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Entrena el modelo k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Hace predicciones
predictions = model.transform(dataset)
# Evalua utilizando Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
Otro algoritmo de clustering implementado en MLlib es el llamado Bisecting K-Means.
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Carga de datos
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Entrenamiento del modelo bisecting k-means
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)
# Hace predicciones
predictions = model.transform(dataset)
# Evalua utilizando Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Muestra los resultados
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
print(center)
Fuente: https://spark.apache.org/docs/3.0.1/ml-clustering.html#bisecting-k-means
Regresión utilizando PySpark#
En este ejemplo se realizara una regresión logística binomial.
from pyspark.ml.classification import LogisticRegression
# Se cargan los datos
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Ajusta el modelp
lrModel = lr.fit(training)
# Imprime los coeficientes y el intercepto para la regresión logística
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# Se puede usar también la familia multinomial para clasificación binaria
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# Ajusta el modelo
mlrModel = mlr.fit(training)
# Imprima los coeficientes e intersecciones para la regresión logística con familia multinomial
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
La clasificación Naive Bayes#
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
Creamos los splits de entrenamiento y test:
data = spark.read.format("libsvm") \
.load("data/mllib/sample_libsvm_data.txt")
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
Aplicamos la clasificación Naive bayes:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)
predictions = model.transform(test)
predictions.show()
Evaluamos el clasificador entrenado:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
Clasificación con árboles de decisión#
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Cargar los datos en formato LIBSVM como un DataFrame
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Index labels agregan metadata a la columna label
# Ajuste en todo el conjunto de datos para incluir todas las etiquetas en el índice
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Identifica automáticamente características categóricas y las indexa
# Especifica maxCategories para que las entidades con > 4 valores distintos se traten como continuas
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Divide los datos en conjuntos de entrenamiento y prueba (30% retenido para prueba)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Entrena el modelo DecisionTree
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
# Encadena índices y árboles en tuna Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])
# Entrena el modelo y ejecuta los indexadores
model = pipeline.fit(trainingData)
# Hace predicciones
predictions = model.transform(testData)
# Selecciona filas de jemplo para mostrar
predictions.select("prediction", "indexedLabel", "features").show(5)
# Selecciona (predicción, etiqueta verdadera) y calcula el error de prueba
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
treeModel = model.stages[2]
# Resumen
print(treeModel)
Fuente: https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#decision-tree-classifier
Se pueden encontrar otros algoritmos de clasificación de la biblioteca Spark MLLib en:
https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#classification