Tuesday, May 22, 2018

Spark dataframe json schema misinferring - String typed column instead of struct

All you wanted is to load some complex json files into a dataframe, 
and use sql with [lateral view explode] function to parse the json.

Sounds like the basics of SparkSql.

A problem can arise when one of the inner fields of the json, 
has undesired non-json values in some of the records. 

For instance, an inner field might contains HTTP errors, that would be interpreted as a string, rather than as a struct. 

As a result, our schema would look like:

root
 |-- headers: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |-- requestBody: string (nullable = true)

Instead of 

root
 |-- headers: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |-- requestBody: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)


When trying to explode a "string" type, we will get a miss type error:

org.apache.spark.sql.AnalysisException: Can't extract value from requestBody#10


How can we remove the non-json values and still, get the correct schema in the dataframe?

Removing the non-jsons values, using string filtering with SparkSql on requestBody column, 
will clean the data, but won't change the type of the column, and we will still not be able to use json functions on the column.

In order to clean the data and then, getting the right schema, 
we should load the dataset into a RDD, filtering out bad rows, and creating a dataframe out of the clean RDD:

Good solution:  RDD -> Cleansing -> Dataframe ( using spark.read.json(cleanRDD)



A bad solution would be to load the data as a dataframe. The requestBody column will be set as a String. Now we can filter out bad records, and store the dataframe back to disk.

At that point, the value of the string typed requestBody will be encapsulated with quotes, and any future effort to load the "fixed" dataset into a dataframe, would set this column's type to be string.
With that, we will have to reload the "fixed" data into a RDD, and cleaning out the encapsulated string, using replace command:

val fixedRDD=badRDD.map(_.replace("\\\"","\"")).map(_.replace("\"{","{")).map(_.replace("}\"","}"))

From that point we can use spark.read.json to get a valid dataframe, and querying it with explode function!

Good luck
and don't forget that sometimes, RDD is the right way to clean the data, before getting it ready and clean for higher level APIs, such as dataframes.

Saturday, January 27, 2018

SolrCloud - suggestions for leaders rebalancing


This is a useful and simple bash script that helps rebalancing the cluster leaders, in case of having more than one leader of the same collection, hosted on the same node.

Hosting more than 1 leader of the same collection on the same node (2 different shards leaders) is not a good practice, as it is not helping distributing the writes load for the collection.

The script would suggest the next action -> move from node A to B, the leader of shard X, of collection Y. What it actually means is -> add node B to shard X, and remove node A from shard X of collection Y.  It would ask you to re-run the script afterwards, and get the next required actions, based on the new solr cluster status.

If it doesn't find any collection that has more than 1 leader hosted on the same node, it won't suggest anything.

In Solr 7, new rules for cluster auto scaling and management were added, and it is worth checking them as well.

The script contains many useful commands for playing and parsing the "clusterstatus" output.
You might add suggestion for general rebalancing of the cluster, in case of, for instance, too many leaders that are hosted on few nodes, while other nodes are not hosting any leader.

It only requires JQ installed on the running machine.

example for execution: ./solr-suggest.sh "https://solr-node01"



#!/bin/bash
URL="$1"; shift


raw_cluster_status=$(curl -s -k "${URL}/solr/admin/collections?action=CLUSTERSTATUS&wt=json" | \
                jq '.cluster.collections[].shards[].replicas[]')



all_nodes=$(echo "${raw_cluster_status}" | jq '. |  "\(.node_name)" '  | sort | uniq | tr -d "\"" | awk -F ':' '{print $1}')
all_leaders_sorted=$(echo "${raw_cluster_status}" | jq '.  | select(.leader=="true") |  "\(.node_name)" ' | tr -d '"' | sort| uniq -c | sort | tr -s " " | tr " " "-"  | awk -F ':' '{print $1}')
all_leaders_sorted_reveres=$(echo "${raw_cluster_status}" | jq '.  | select(.leader=="true") |  "\(.node_name)" ' | tr -d '"' | sort| uniq -c | sort -r | tr -s " " | tr " " "-" | awk -F ':' '{print $1}')
no_leader_nodes=$(diff <(echo "$all_nodes" | sort ) <(echo "$all_leaders_sorted_reveres" | awk -F '-' '{print $3}' | sort) | tr -d "<" | tr -d " " | sed 's/^...$//g' | grep -v -e '^$' )
leader_host_to_shards=$(echo "${raw_cluster_status}" | jq '. | select(.leader=="true")  |   "\(.core) | \(.node_name)"' | tr -d " " | awk -F '|' '{gsub("\"","",$1); gsub("\"","",$2); split($1,coll,"_replica"); split($2,coll2,":"); print coll[1]"@"coll2[1]}')
all_collections=$(echo "${raw_cluster_status}" | jq '.  |  "\(.core)" ' | sed 's/\"\(.*\)_shard.*/\1/'  | sort | uniq )



nodes_sorted_tmp=/tmp/nodes_sorted.tmp
rm $nodes_sorted_tmp 2>/dev/null

for leader in  $no_leader_nodes; do echo "-0-$leader"  >> $nodes_sorted_tmp; done
for leader in  $all_leaders_sorted; do echo $leader  >> $nodes_sorted_tmp; done
all_nodes_sorted=$(cat $nodes_sorted_tmp)

rm $nodes_sorted_tmp 2>/dev/null


echo "-----------------------------"
echo "amount of leaders per node:"
echo "${all_nodes_sorted}" | tr -s "-" " "


echo -e "\n---------------------SUGGESTIONS FOR RE-BALANCE NODES THAT HOST MORE THAN ONE LEADER OF THE SAME COLLECTION---------------------"


for col in $all_collections; do
        collection_leaders=$(echo "${leader_host_to_shards}" | grep $col"_shard" )
        bad_nodes=$(echo "$collection_leaders" |  awk -F '@' '{print $2}' | sort | uniq -c |  grep -v "1 " )

        for bad_node in $(echo "$bad_nodes" | awk '{print $2}'); do
                related_shards=$(echo "$collection_leaders" | grep $bad_node |  awk -F '@' '{split($1,srd   ,"shard"); print "shard"srd[2] }' )
                for shard in $related_shards; do
                        echo $shard
                        shard_nodes=$(echo $raw_cluster_status | jq  '. | select(.core | contains("'${col}'_'${shard}'")) | .node_name')
                        for inode in $(echo "$all_nodes_sorted"); do
                                node=$(echo $inode | awk -F '-' '{print $3}')
                                echo "checking candidate $node"
                                echo "$shard_nodes"
                                #check if candidate node is not part of the shard
                                echo "$shard_nodes" | grep $node > /dev/null
                                if [  $? -eq 1 ]; then
                                        #check if candidate id
                                        echo "$collection_leaders" | grep $node > /dev/null
                                        if [  $? -eq 1 ]; then
                                                #the node with the least number of leaders and is not a member of the problematic shard -> replace the current leader with this node
                                                echo "Node $bad_node is hosting more than 1 leader on collecition - $col, shard - $shard "
                                                echo "___________________________________________________________________________________________________________"
                                                echo "Move the replica that is hosted on $bad_node to $node, which has the least number of leaders on it."
                                                echo "___________________________________________________________________________________________________________"
                                                echo "Re-run this script again after switching nodes for the replica"
                                                exit
                                        fi
                                fi
                        done
                done
                echo "Couldn't find a replacement host for $bad_node on $col - $shard. Remove this replica and add it so that the leader would move to a different host"
        done

done


echo -e "\nGreat. All nodes are hosting at most one leader of the same collection.\n"