Tutorial Spark#

  • Actualizamos nuestro sistema operativo e instalamos Java y Scala

apt update
  Cell In [1], line 1
    apt update
        ^
SyntaxError: invalid syntax
apt upgrade --fix-broken
apt install default-jre
apt autoremove
apt install default-jdk
apt install scala
java -version
openjdk version "11.0.13" 2021-10-19
OpenJDK Runtime Environment (build 11.0.13+8-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.13+8-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)
javac -version
javac 11.0.13
scala -version
readlink -f $(which java)
/usr/lib/jvm/java-11-openjdk-amd64/bin/java
  • Descargamos e instalamos Spark

wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.asc
--2022-02-23 15:19:24--  https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.asc
Resolviendo downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Conectando con downloads.apache.org (downloads.apache.org)[135.181.214.104]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 833 [text/plain]
Guardando como: “spark-3.2.1-bin-hadoop3.2.tgz.asc.1”

spark-3.2.1-bin-had 100%[===================>]     833  --.-KB/s    en 0s      

2022-02-23 15:19:25 (119 MB/s) - “spark-3.2.1-bin-hadoop3.2.tgz.asc.1” guardado [833/833]
wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.sha512
wget https://downloads.apache.org/spark/KEYS
--2022-02-23 15:19:26--  https://downloads.apache.org/spark/KEYS
Resolviendo downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Conectando con downloads.apache.org (downloads.apache.org)[135.181.214.104]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 101227 (99K)
Guardando como: “KEYS.1”

KEYS.1              100%[===================>]  98,85K   121KB/s    en 0,8s    

2022-02-23 15:19:28 (121 KB/s) - “KEYS.1” guardado [101227/101227]
gpg --import KEYS
gpg --verify spark-3.2.1-bin-hadoop3.2.tgz.asc  spark-3.2.1-bin-hadoop3.2.tgz
gpg: Firmado el jue 20 ene 2022 21:10:50 CET
gpg:                usando RSA clave CEA888BDB32D983C7F094564AC01E6E9139F610C
gpg: Firma correcta de "Huaxin Gao (CODE SIGNING KEY) <huaxin.gao11@gmail.com>" [desconocido]
gpg: ATENCIÓN: ¡Esta clave no está certificada por una firma de confianza!
gpg:          No hay indicios de que la firma pertenezca al propietario.
Huellas dactilares de la clave primaria: CEA8 88BD B32D 983C 7F09  4564 AC01 E6E9 139F 610C
ls
tar xvf spark-3.2.1-bin-hadoop3.2.tgz
  • Instalamos e configuramos la biblioteca de Python findspark

pip install -q findspark
pwd
/home/usuario/gitRepos/spark/sparkFiles
ls
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"
  • Utilizamos la biblioteca findspark para crear un dataframe

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([{"Hola": "mundo"} for x in range(1000)])
df.show(3, False)

Extraer, transformar y selecionar#

  • Importamos la biblioteca pyspark

import pyspark
print(pyspark.__version__)
3.2.1

Transformar#

  • Transformar significa escalar, convertir o modificar las características de los datos.

  • Uno de los primeros pasos en NLP (Natural Language Processing) es convertir el texto en tokens o palabras tokenizadas.

from pyspark.ml.feature import Tokenizer
oraciones_df = spark.createDataFrame([
    (1, "Introducción a sparkMlib"),
    (2, "Mlib incluye bibliotecas para clasificación y regresión"),
    (3, "También incluye soporte a datapipe lines"),
    
], ["id", "oraciones"])
oraciones_df.show()
  • Para reflejar la importancia de una palabra en un texto utilizamos Term frequency-inverse document frequency (TF-IDF).

sent_token = Tokenizer(inputCol = "oraciones", outputCol = "palabras")
sent_tokenized_df = sent_token.transform(oraciones_df)
sent_tokenized_df.take(10)
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol = "palabras", outputCol = "rawfeatures", numFeatures = 20)
sent_fhTF_df = hashingTF.transform(sent_tokenized_df)
sent_fhTF_df.take(1)
idf = IDF(inputCol = "rawfeatures", outputCol = "idffeatures")
idfModel = idf.fit(sent_fhTF_df)
tfidf_df = idfModel.transform(sent_fhTF_df)
tfidf_df.take(1)
  • Utilizamos la clase StandardScaler para estandarizar datos en ML.

  • Los escala entre -1 y 1.

from pyspark.ml.feature import  StandardScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,40000.0,2.0]),),
    (3, Vectors.dense([30.0,50000.0,3.0]),),
    
],["id", "features"] )
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)
stand_sfeatures_df.show()
  • Utilizamos la clase MinMaxScaler en ML para normalizar datos numéricos.

  • Escala los datos entre 0 y 1.

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,40000.0,2.0]),),
    (3, Vectors.dense([30.0,50000.0,3.0]),),
    
],["id", "features"] )
features_df.show()
  • Aplicamos la transformación de la biblioteca MinMaxScaler.

features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)
sfeatures_df.show()
  • La clase Bucketizer transforma los datos en varias frecuencias o buckets.

from pyspark.ml.feature import  Bucketizer
from pyspark.ml.linalg import Vectors
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])
b_df.show()
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)
bucketed_df.show()

Fuente: https://spark.apache.org/docs/3.0.1/ml-features.html#bucketizer

Clustering#

  • Para agrupar datos en un razonable grupo de frecuencias se puede utilizar como técnica el llamado clustering.

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import glob
# Carga los datos
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Entrena el modelo k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
# Hace predicciones
predictions = model.transform(dataset)
# Evalua utilizando Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
  • Otro algoritmo de clustering implementado en MLlib es el llamado Bisecting K-Means.

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Carga de datos
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")
# Entrenamiento del modelo bisecting k-means
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)
# Hace predicciones
predictions = model.transform(dataset)
# Evalua utilizando Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Muestra los resultados
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)

Fuente: https://spark.apache.org/docs/3.0.1/ml-clustering.html#bisecting-k-means

Regresión utilizando PySpark#

  • En este ejemplo se realizara una regresión logística binomial.

from pyspark.ml.classification import LogisticRegression
# Se cargan los datos
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Ajusta el modelp
lrModel = lr.fit(training)
# Imprime los coeficientes y el intercepto para la regresión logística
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
# Se puede usar también la familia multinomial para clasificación binaria
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")
# Ajusta el modelo
mlrModel = mlr.fit(training)
# Imprima los coeficientes e intersecciones para la regresión logística con familia multinomial
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

La clasificación Naive Bayes#

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  • Creamos los splits de entrenamiento y test:

data = spark.read.format("libsvm") \
    .load("data/mllib/sample_libsvm_data.txt")

splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
  • Aplicamos la clasificación Naive bayes:

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)
predictions = model.transform(test)
predictions.show()
  • Evaluamos el clasificador entrenado:

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Clasificación con árboles de decisión#

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Cargar los datos en formato LIBSVM como un DataFrame
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels agregan metadata a la columna label
# Ajuste en todo el conjunto de datos para incluir todas las etiquetas en el índice
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Identifica automáticamente características categóricas y las indexa
# Especifica maxCategories para que las entidades con > 4 valores distintos se traten como continuas
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Divide los datos en conjuntos de entrenamiento y prueba (30% retenido para prueba)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Entrena el modelo DecisionTree
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Encadena índices y árboles en tuna Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Entrena el modelo y ejecuta los indexadores
model = pipeline.fit(trainingData)

# Hace predicciones
predictions = model.transform(testData)

# Selecciona filas de jemplo para mostrar
predictions.select("prediction", "indexedLabel", "features").show(5)

# Selecciona (predicción, etiqueta verdadera) y calcula el error de prueba
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# Resumen
print(treeModel)