# Tutorial Spark 

- Para realizar este tutorial abrimos un cuaderno de Jupyter <https://jupyter.org/> en nuestro servidor local o utilizamos Colaboratory: <https://colab.research.google.com>.
- Los pasos son los siguientes:

- Actualizamos nuestro sistema operativo e instalamos Java y Scala

In [None]:
apt update

In [None]:
apt upgrade --fix-broken

In [None]:
apt install default-jre

In [None]:
apt autoremove

In [None]:
apt install default-jdk

In [None]:
apt install scala

In [6]:
java -version

openjdk version "11.0.13" 2021-10-19
OpenJDK Runtime Environment (build 11.0.13+8-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.13+8-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)


In [7]:
javac -version

javac 11.0.13


In [None]:
scala -version

In [9]:
readlink -f $(which java)

/usr/lib/jvm/java-11-openjdk-amd64/bin/java


- Descargamos e instalamos Spark

In [24]:
wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

In [17]:
wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.asc

--2022-02-23 15:19:24--  https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.asc
Resolviendo downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Conectando con downloads.apache.org (downloads.apache.org)[135.181.214.104]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 833 [text/plain]
Guardando como: “spark-3.2.1-bin-hadoop3.2.tgz.asc.1”


2022-02-23 15:19:25 (119 MB/s) - “spark-3.2.1-bin-hadoop3.2.tgz.asc.1” guardado [833/833]



In [None]:
wget https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz.sha512

In [19]:
wget https://downloads.apache.org/spark/KEYS

--2022-02-23 15:19:26--  https://downloads.apache.org/spark/KEYS
Resolviendo downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Conectando con downloads.apache.org (downloads.apache.org)[135.181.214.104]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 101227 (99K)
Guardando como: “KEYS.1”


2022-02-23 15:19:28 (121 KB/s) - “KEYS.1” guardado [101227/101227]



In [None]:
gpg --import KEYS

In [21]:
gpg --verify spark-3.2.1-bin-hadoop3.2.tgz.asc  spark-3.2.1-bin-hadoop3.2.tgz

gpg: Firmado el jue 20 ene 2022 21:10:50 CET
gpg:                usando RSA clave CEA888BDB32D983C7F094564AC01E6E9139F610C
gpg: Firma correcta de "Huaxin Gao (CODE SIGNING KEY) <huaxin.gao11@gmail.com>" [desconocido]
gpg: ATENCIÓN: ¡Esta clave no está certificada por una firma de confianza!
gpg:          No hay indicios de que la firma pertenezca al propietario.
Huellas dactilares de la clave primaria: CEA8 88BD B32D 983C 7F09  4564 AC01 E6E9 139F 610C


In [None]:
ls

In [None]:
tar xvf spark-3.2.1-bin-hadoop3.2.tgz

- Instalamos e configuramos la biblioteca de Python *findspark*

In [26]:
pip install -q findspark

In [29]:
pwd

/home/usuario/gitRepos/spark/sparkFiles


In [None]:
ls

In [31]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"

- Utilizamos la biblioteca *findspark* para crear un *dataframe* 

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
df = spark.createDataFrame([{"Hola": "mundo"} for x in range(1000)])
df.show(3, False)

## Extraer, transformar y selecionar

- Importamos la biblioteca *pyspark*

In [36]:
import pyspark
print(pyspark.__version__)

3.2.1


## Transformar

- Transformar significa escalar, convertir o modificar las características de los datos.

-  Uno de los primeros pasos en NLP (Natural Language Processing) es convertir el texto en tokens o palabras *tokenizadas*.

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
oraciones_df = spark.createDataFrame([
    (1, "Introducción a sparkMlib"),
    (2, "Mlib incluye bibliotecas para clasificación y regresión"),
    (3, "También incluye soporte a datapipe lines"),
    
], ["id", "oraciones"])

In [None]:
oraciones_df.show()

- Para reflejar la importancia de una palabra en un texto utilizamos *Term frequency-inverse document frequency (TF-IDF)*.

In [None]:
sent_token = Tokenizer(inputCol = "oraciones", outputCol = "palabras")
sent_tokenized_df = sent_token.transform(oraciones_df)

In [None]:
sent_tokenized_df.take(10)

In [None]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol = "palabras", outputCol = "rawfeatures", numFeatures = 20)
sent_fhTF_df = hashingTF.transform(sent_tokenized_df)

In [None]:
sent_fhTF_df.take(1)

In [None]:
idf = IDF(inputCol = "rawfeatures", outputCol = "idffeatures")
idfModel = idf.fit(sent_fhTF_df)
tfidf_df = idfModel.transform(sent_fhTF_df)

In [None]:
tfidf_df.take(1)

- Fuente: <https://spark.apache.org/docs/3.0.1/ml-features.html#tokenizer>

- Utilizamos la clase StandardScaler para estandarizar datos en ML.
- Los escala entre -1 y 1.

In [None]:
from pyspark.ml.feature import  StandardScaler
from pyspark.ml.linalg import Vectors

In [None]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,40000.0,2.0]),),
    (3, Vectors.dense([30.0,50000.0,3.0]),),
    
],["id", "features"] )

In [None]:
features_stand_scaler = StandardScaler(inputCol = "features", outputCol = "sfeatures", withStd=True, withMean=True)
stmodel = features_stand_scaler.fit(features_df)
stand_sfeatures_df = stmodel.transform(features_df)

In [None]:
stand_sfeatures_df.show()

- Fuente: MA Raza, Ph.D. <https://towardsdatascience.com/machine-learning-with-spark-f1dbc1363986>.
- Spark docs: <https://spark.apache.org/docs/3.0.1/ml-features.html#standardscaler>.

- Utilizamos la clase MinMaxScaler en ML para normalizar datos numéricos. 
- Escala los datos entre 0 y 1.

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [None]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0,10000.0,1.0]),),
    (2, Vectors.dense([20.0,40000.0,2.0]),),
    (3, Vectors.dense([30.0,50000.0,3.0]),),
    
],["id", "features"] )

In [None]:
features_df.show()

- Aplicamos la transformación de la biblioteca MinMaxScaler.

In [None]:
features_scaler = MinMaxScaler(inputCol = "features", outputCol = "sfeatures")
smodel = features_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)

In [None]:
sfeatures_df.show()

- Fuente: <https://spark.apache.org/docs/3.0.1/ml-features.html#minmaxscaler>
- El ejemplo completo se encuentra en *examples/src/main/python/ml/min_max_scaler_example.py* dentro del repositorio de Spark.

- La clase Bucketizer transforma los datos en varias frecuencias o *buckets*.

In [None]:
from pyspark.ml.feature import  Bucketizer
from pyspark.ml.linalg import Vectors

In [None]:
splits = [-float("inf"), -10, 0.0, 10, float("inf")]
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]
b_df = spark.createDataFrame(b_data, ["features"])

In [None]:
b_df.show()

In [None]:
bucketizer = Bucketizer(splits=splits, inputCol= "features", outputCol="bfeatures")
bucketed_df = bucketizer.transform(b_df)

In [None]:
bucketed_df.show()

Fuente: <https://spark.apache.org/docs/3.0.1/ml-features.html#bucketizer>

## Clustering

- Para agrupar datos en un razonable grupo de frecuencias se puede utilizar como técnica el llamado *clustering*.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import glob

In [37]:
# Carga los datos
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

In [None]:
# Entrena el modelo k-means
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [None]:
# Hace predicciones
predictions = model.transform(dataset)

In [None]:
# Evalua utilizando Silhouette score
evaluator = ClusteringEvaluator()

In [None]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

In [None]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

- Fuente: <https://spark.apache.org/docs/3.0.1/ml-clustering.html#k-means>

- Otro algoritmo de *clustering* implementado en MLlib es el llamado *Bisecting K-Means*.

In [None]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [None]:
# Carga de datos
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

In [None]:
# Entrenamiento del modelo bisecting k-means
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

In [None]:
# Hace predicciones
predictions = model.transform(dataset)

In [None]:
# Evalua utilizando Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

In [None]:
# Muestra los resultados
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)

Fuente: <https://spark.apache.org/docs/3.0.1/ml-clustering.html#bisecting-k-means>

## Regresión utilizando PySpark

- En este ejemplo se realizara una regresión logística binomial.

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
# Se cargan los datos
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [None]:
# Ajusta el modelp
lrModel = lr.fit(training)

In [None]:
# Imprime los coeficientes y el intercepto para la regresión logística
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

In [None]:
# Se puede usar también la familia multinomial para clasificación binaria
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

In [None]:
# Ajusta el modelo
mlrModel = mlr.fit(training)

In [None]:
# Imprima los coeficientes e intersecciones para la regresión logística con familia multinomial
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

- Fuente: <https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#binomial-logistic-regression>

## La clasificación Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

- Creamos los *splits* de entrenamiento y test:

In [None]:
data = spark.read.format("libsvm") \
    .load("data/mllib/sample_libsvm_data.txt")

splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

- Aplicamos la clasificación Naive bayes:

In [None]:
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)
predictions = model.transform(test)

In [None]:
predictions.show()

- Evaluamos el clasificador entrenado:

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

- Fuente: <https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#naive-bayes>

## Clasificación con árboles de decisión

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Cargar los datos en formato LIBSVM como un DataFrame
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels agregan metadata a la columna label
# Ajuste en todo el conjunto de datos para incluir todas las etiquetas en el índice
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Identifica automáticamente características categóricas y las indexa
# Especifica maxCategories para que las entidades con > 4 valores distintos se traten como continuas
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [None]:
# Divide los datos en conjuntos de entrenamiento y prueba (30% retenido para prueba)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Entrena el modelo DecisionTree
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Encadena índices y árboles en tuna Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Entrena el modelo y ejecuta los indexadores
model = pipeline.fit(trainingData)

# Hace predicciones
predictions = model.transform(testData)

# Selecciona filas de jemplo para mostrar
predictions.select("prediction", "indexedLabel", "features").show(5)

# Selecciona (predicción, etiqueta verdadera) y calcula el error de prueba
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# Resumen
print(treeModel)

- Fuente: <https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#decision-tree-classifier>
- Se pueden encontrar otros algoritmos de clasificación de la biblioteca Spark MLLib en:     
<https://spark.apache.org/docs/3.0.1/ml-classification-regression.html#classification>