1. Introducción

Hola a todos y bienvenidos a un nuevo post de nuestro querido blog!. Recientemente he obtenido la certificación CRT020 Spark Databricks que consta de una parte teórica y otra práctica en donde se pone a prueba la soltura del candidato en la API de Dataframes de Spark. En este post vamos a ver algunos de los ejemplos que me han servido para preparar la parte práctica haciendo un recorrido por las agregaciones, joins, groupby o window.

Tecnologías empleadas:

  • Databricks
  • Spark 2.4.4
  • PySpark 2.4.4

Los siguientes ejemplos han sido ejecutados y verificados en el entorno de test que proporciona Databricks

2. Antes de empezar

Subimos el siguiente fichero al filesystem del entorno de Databricks.

id;name;surname;age
1;Jorge;Hernandez;32
2;Jorge;Hernandez;63
3;Jose;Hernandez;32
4;Barbara;Vazquez;27
5;Jose;Illan;28
6;Jose María;Gutierrez;33
6;Jose María;;80

Esto se hace desde la opción de menú Data

3. Leyendo ficheros

Sin esquema
Utilizamos la opción inferSchema para que las columnas id y age sean sean inferidas como enteros

from pyspark.sql.functions import *
from pyspark.sql.types import *

df = spark.read.format('csv')\
               .option('sep', ';')\
               .option('header', True)\
               .option('inferSchema', True)\
               .load('/FileStore/tables/user.csv')

Con esquema
Indicamos el esquema de forma programática

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([StructField('id', IntegerType()), 
                     StructField('name', StringType()), 
                     StructField('surname', StringType()), 
                     StructField('age', IntegerType())])
df = spark.read.format('csv')\
               .option('sep', ';')\
               .option('header', True)\
               .schema(schema)\
               .load('/FileStore/tables/user.csv')

4. Trabajando con Strings

translate

df.select(translate(col('surname'), 'ern', '123')).show()

Resultado

+----------------------------+
|translate(surname, ern, 123)|
+----------------------------+
|                   H123a3d1z|
|                   H123a3d1z|
|                   H123a3d1z|
|                     Vazqu1z|
|                       Illa3|
|                   Guti1221z|
|                        null|
+----------------------------+

lpad

spark.range(1).select(lpad(lit('HOLA'), 10, '#')).first()[0]

Resultado

'######HOLA'

trim

spark.range(1).select(trim(lit('  HOLA  '))).first()[0]

Resultado

'HOLA'

lower

spark.range(1).select(lower(lit('  HOLA  '))).first()[0]

Resultado

'hola'

upper

spark.range(1).select(upper(lit('  hola  '))).first()[0]

Resultado

'HOLA'

initcap

spark.range(1).select(initcap(lit('hola soy jorge'))).first()[0]

Resultado

'Hola Soy Jorge'

5. Trabajando con nulos

isNull

df.where(col('surname').isNull()).show()

Resultado

+---+----------+-------+---+
| id|      name|surname|age|
+---+----------+-------+---+
|  6|Jose María|   null| 80|
+---+----------+-------+---+

drop

df.na.drop('any', subset=['surname', 'name']).show()

Resultado

+---+----------+---------+---+
| id|      name|  surname|age|
+---+----------+---------+---+
|  1|     Jorge|Hernandez| 32|
|  2|     Jorge|Hernandez| 63|
|  3|      Jose|Hernandez| 32|
|  4|   Barbara|  Vazquez| 27|
|  5|      Jose|    Illan| 28|
|  6|Jose María|Gutierrez| 33|
+---+----------+---------+---+

fill

df.na.fill('NULO', subset=['surname', 'name']).show()

Resultado

+---+----------+---------+---+
| id|      name|  surname|age|
+---+----------+---------+---+
|  1|     Jorge|Hernandez| 32|
|  2|     Jorge|Hernandez| 63|
|  3|      Jose|Hernandez| 32|
|  4|   Barbara|  Vazquez| 27|
|  5|      Jose|    Illan| 28|
|  6|Jose María|Gutierrez| 33|
|  6|Jose María|     NULO| 80|
+---+----------+---------+---+

replace

df.na.replace(['Hernandez'], ['APELLIDO'], 'surname').show()

Resultado

+---+----------+---------+---+
| id|      name|  surname|age|
+---+----------+---------+---+
|  1|     Jorge| APELLIDO| 32|
|  2|     Jorge| APELLIDO| 63|
|  3|      Jose| APELLIDO| 32|
|  4|   Barbara|  Vazquez| 27|
|  5|      Jose|    Illan| 28|
|  6|Jose María|Gutierrez| 33|
|  6|Jose María|     null| 80|
+---+----------+---------+---+

6. Trabajando con expresiones regulares

regexp_replace
Para reemplazar partes de cadenas

reg = '(Jorge|Jose|Barbara)'
df.select(regexp_replace(col('name'), reg, 'PIPO'), 'name').show()

Resultado

+------------------------------------------------+----------+
|regexp_replace(name, (Jorge|Jose|Barbara), PIPO)|      name|
+------------------------------------------------+----------+
|                                            PIPO|     Jorge|
|                                            PIPO|     Jorge|
|                                            PIPO|      Jose|
|                                            PIPO|   Barbara|
|                                            PIPO|      Jose|
|                                      PIPO María|Jose María|
|                                      PIPO María|Jose María|
+------------------------------------------------+----------+

regexp_extract
Para búsqueda de cadenas a través de expresiones regulares

reg = 'Jorge|Jose|Barbara'
df.select(regexp_extract(col('name'), reg, 0), 'name').show()

Resultado

+-------------------------------------------+----------+
|regexp_extract(name, Jorge|Jose|Barbara, 0)|      name|
+-------------------------------------------+----------+
|                                      Jorge|     Jorge|
|                                      Jorge|     Jorge|
|                                       Jose|      Jose|
|                                    Barbara|   Barbara|
|                                       Jose|      Jose|
|                                       Jose|Jose María|
|                                       Jose|Jose María|
+-------------------------------------------+----------+

7. Trabajando con fechas

date_add, date_sub, datediff, months_between

spark.range(1)\
     .select(current_date().alias('date'))\
     .select(months_between(date_add(col('date'), 1), date_sub(col('date'), 1)),
             datediff(date_add(col('date'), 1), date_sub(col('date'), 1))).show()

Resultado

+----------------------------------------------------------+----------------------------------------------+
|months_between(date_add(date, 1), date_sub(date, 1), true)|datediff(date_add(date, 1), date_sub(date, 1))|
+----------------------------------------------------------+----------------------------------------------+
|                                                0.06451613|                                             2|
+----------------------------------------------------------+----------------------------------------------+

Convertir la fecha actual en string y viceserca

spark.range(1)\
     .select(current_timestamp().alias('date').cast(StringType()))\
     .select(unix_timestamp(col('date')).cast(TimestampType())).show()

Resultado

+------------------------------------------------------------+
|CAST(unix_timestamp(date, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)|
+------------------------------------------------------------+
|                                         2019-11-17 17:03:52|
+------------------------------------------------------------+

8. Trabajando con arrays

Buscar la palabra más repetida

spark.range(1)\
     .select(lit('a b c a a b').alias('value'))\
     .select(explode(split(col('value'), " ")).alias('word'))\
     .groupby('word')\
     .count()\
     .sort(desc('count'))\
     .limit(1)\
     .show()

Resultado

+----+-----+
|word|count|
+----+-----+
|   a|    3|
+----+-----+

size

spark.range(1)\
     .select(split(lit('a b c d e'), ' ').alias('array'))\
     .select(size(col('array'))).show()

Resultado

+-----------+
|size(array)|
+-----------+
|          5|
+-----------+

slice

spark.range(1)\
     .select(split(lit('a b c d e'), ' ').alias('array'))\
     .select(slice(col('array'), 1, 2)).show()

Resultado

+------------------+
|slice(array, 1, 2)|
+------------------+
|            [a, b]|
+------------------+

array_contains

spark.range(1)\
     .select(split(lit('a b c d e'), ' ').alias('array'))\
     .select(array_contains(col('array'), 'd')).show()

Resultado

+------------------------+
|array_contains(array, d)|
+------------------------+
|                    true|
+------------------------+

9. Trabajando con Window

Para cada usuario obtener la máxima edad del usuario con el que comparta apellido

window = Window.partitionBy(col('surname'))\
               .orderBy(col('age').desc())
df.select('name', 
          'age', 
          max(col('age')).over(window).alias('max_age_family'),
          rank().over(window).alias('max_age_family')).show()

Resultado

+----------+---+--------------+--------------+
|      name|age|max_age_family|max_age_family|
+----------+---+--------------+--------------+
|Jose María| 33|            33|             1|
|   Barbara| 27|            27|             1|
|Jose María| 80|            80|             1|
|      Jose| 28|            28|             1|
|     Jorge| 63|            63|             1|
|     Jorge| 32|            63|             2|
|      Jose| 32|            63|             2|
+----------+---+--------------+--------------+

10. Trabajando con Json

get_json_object

spark.range(1)\
     .select(expr(""" '{"a": {"b": [1, 2]}}'""").alias('j'))\
     .select(get_json_object(col('j'), "$.a.b")).show()

Resultado

+-------------------------+
|get_json_object(j, $.a.b)|
+-------------------------+
|                    [1,2]|
+-------------------------+

get_json_object

spark.range(1)\
     .select(expr(""" '{"a": {"b": [1, 2]}}'""")\
     .alias('j'))\
     .select(json_tuple(col('j'), "a")).show()

Resultado

+-----------+
|         c0|
+-----------+
|{"b":[1,2]}|
+-----------+

11. Trabajando con Groupby

Calcular la edad máxima por apellido y ordenar descendentemente

df.dropna('any')\
  .groupby('surname')\
  .agg(max('age').alias('max_date'))\
  .sort(col('max_date').desc())\
  .show()

Resultado

+---------+--------+
|  surname|max_date|
+---------+--------+
|Hernandez|      63|
|Gutierrez|      33|
|    Illan|      28|
|  Vazquez|      27|
+---------+--------+

Calcular la edad media por apellido haciendo pivot por el nombre

df.groupby('surname')\
  .pivot('name')\
  .avg('age')\
  .show()

Resultado

+---------+-------+-----+----+----------+
|  surname|Barbara|Jorge|Jose|Jose María|
+---------+-------+-----+----+----------+
|Gutierrez|   null| null|null|      33.0|
|  Vazquez|   27.0| null|null|      null|
|     null|   null| null|null|      80.0|
|    Illan|   null| null|28.0|      null|
|Hernandez|   null| 47.5|32.0|      null|
+---------+-------+-----+----+----------+

12. Trabajando con Struct

struct

df.na.drop('any')\
     .select(struct(col('name'), col('surname')).alias('struct'))\
     .select('struct.*').show()

Resultado

+----------+---------+
|      name|  surname|
+----------+---------+
|     Jorge|Hernandez|
|     Jorge|Hernandez|
|      Jose|Hernandez|
|   Barbara|  Vazquez|
|      Jose|    Illan|
|Jose María|Gutierrez|
+----------+---------+

to_json, from_json

schema = StructType([StructField('name', StringType()),
                    StructField('surname', StringType())])
df.na.drop('any')\
     .select(struct(col('name'), col('surname')).alias('struct')) 
     .select(to_json(col('struct')).alias('json'))\
     .select(from_json(col('json'), schema)).show()

Resultado

+--------------------+
| jsontostructs(json)|
+--------------------+
|  [Jorge, Hernandez]|
|  [Jorge, Hernandez]|
|   [Jose, Hernandez]|
|  [Barbara, Vazquez]|
|       [Jose, Illan]|
|[Jose María, Guti...|
+--------------------+

13. Trabajando con Joins

join

df_city = spark.createDataFrame([[1, 'Las Palmas'], [2, 'Madrid']]).toDF('id', 'name')
df.join(df_city, df.id == df_city.id).drop(df_city.id).show()

Resultado

+---+-----+---------+---+----------+
| id| name|  surname|age|      name|
+---+-----+---------+---+----------+
|  1|Jorge|Hernandez| 32|Las Palmas|
|  2|Jorge|Hernandez| 63|    Madrid|
+---+-----+---------+---+----------+

14. Escribiendo en ficheros

Escribir el dataframe en un fichero con formato parquet

df.write\
  .format('parquet')\
  .mode('overwrite')\
  .partitionBy('surname')\
  .save('/FileStore/tables/user.parquet')

14. Lista de Acciones vs Transformaciones Narrow vs Transformaciones Wide

Las transformaciones en Spark son lazy. Esto quiere decir que no se ejecutan hasta que una acción es realizada. Por ello es importante conocer la diferencia entre acciones y transformaciones.
Acciones
Son métodos que desencadenan el inicio de un job en el cluster.

  • show
  • collect
  • take
  • first
  • forEach
  • toLocalIterator
  • save
  • saveAsTable

Transformaciones Narrow
Son aquellas transformaciones en las que no hay shuffle de datos, es decir cada partición es capaz de resolver la tarea actual sin necesidad de comunicarse con otras particiones

  • union
  • filter / where
  • drop
  • withColumn
  • withColumnRenamed
  • limit
  • join (cuando hay broadcast)

Transformaciones Wide
Son aquellas transformaciones en las que hay shuffle de datos, es decir cada partición necesita hablar con las otras para resolver la tarea actual
Exchange HashPartitions

  • join
  • sort
  • groupby
  • distinct

Exchange RoundRobinPartition

  • repartition

Exchange Simple

  • Todas las agregaciones, first, max, min, count, avg, …

Otras

  • coalesce (Aunque no produce shuffle se le considera transformación wide)