Thursday, December 3, 2015

Creating a standalone HiveMetastore (Not in Hadoop cluster)

When benchmarking Presto database on top of S3 files, I found out that I have to install a Hive metastore instance.

Image result for lonely bee

(A standalone bee)

I didn't need HiveServer, Mapreduce, or Hadoop cluster. So how do you do that?

Here are the steps:


  1. Install Hive Metastore Repository - an instance of one of the dbs that hive metastore works with (MySql, PostgresSql, MsSql, Oracle .. check documentation)
  2. Install Java
  3.  Download Vanilla Hadoop http://hadoop.apache.org/releases.html and unpack on the hive metastore instance  (let's say that you unpacked to /apps/hadoop-2.6.2)
  4. Set environment variables :
    1.  export HADOOP_PREFIX=/apps/hadoop-2.6.2
    2.   export HADOOP_USER_CLASSPATH_FIRST=true
  5. Download hive http://www.apache.org/dyn/closer.cgi/hive/ and upack on you instace
  6. Create a schema (user) for hive user and build the hive schema in the the hive metastore repository db using hive scripts (a sample script for mysql): 
    1.    /apps/apache-hive-1.2.1-bin/scripts/metastore/upgrade/mysql/hive-schema-1.2.0.mysql.sql
  7. configure hive-site.xml with the right parameters for:
    1. ConnectionUrl   (jdbc:mysql://localhost:3666/hive  for example)
    2. ConnectionDriverName 
    3. ConnectionUserName  (the created database user)
    4. ConnectionPassword  (the created database user password)
    5. hive.metastore.warehouse.dir - set it to a local path (file:///home/presto/ for example)
  8. Copy the required jar for jdbc connection to the metastore repository in the hive class path. (Ojdbc6 for oracle, mysql-jdbc-connector for mysql and so on)
  9. Start hive metastore -   /apps/apache-hive-1.2.1-bin/bin/hive --service metastore
  10. For accessing S3:
    1. copy these jars to the classpath:
      1. aws-java-sdk-1.6.6.jar        (http://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk/1.6.6)
      2. hadoop-aws-2.6.0.jar    (http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.0/hadoop-aws-2.6.0.jar)
    2.  you can specify these parameters on the hadoop core-site.xml
      1. fs.s3.awsAccessKeyId
      2. fs.s3.awsSecretAccessKey
      3. <property>
           <name>fs.s3n.impl</name>
           <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
        </property>
    3. for secured access to s3, use S3A connection in your URL, 
      1. add fs.s3a.connection.ssl.enabled to haddop_home/etc/hadoop/core-site.xml
      2. you also need to set these parameters for s3 access in the hadoop core-site.xml file:
        1. fs.s3a.secret.key
        2. fs.s3a.access.key
      3. Unfortunately, There is no support currently for temporary S3 credentials



Finally, when running presto, we will use the thrift address and port of the hive metastore service.

If you are running EMR from time to time, you are able to use that external metastore repository according to AWS documentation

That's it. No need for additional Hadoop libraries or settings.
Good luck!

Tuesday, December 1, 2015

Secured ELK (including filebeat)

It is not so difficult to start an ELK topology with an application server that forwards logs to a logstash server and a logstash server that sends logs in the right schema to Elastisearch.

Then, it's also quite straight forward to start an Elasticserach cluster with Kibana for some visualizations.

The problem started for us when we wanted to secure the whole pipeline - from the app server to the client through Logstash, Elasticsearch and Kibana.

We also got to know filebeat tand we almost immediately felt in love.

I will explain here, step by step, the whole installation and deploying process that we have gone through. All the configuration files are attached. Enjoy.

General Architecture:


















  1. Components:
    1. Filebeat  - An evolution of the old forwarder. Light and an easy to use tool for sending data from log files to Logstash.
    2. Logstash - A ready to use tool for sending logs data to Elasticsearch. It supports many other outputs, inputs and manipulations of its input log records. I will show the integration with filebeat as input and Elasticsearch and s3 as output.
    3. Elaticsearch - Our back-end for storing and indexing the logs
    4. Kibana - The visualization tool on top of elasticsearch
  2. Filebeat agent installation (talking with HTTPS to logstash)
    1. As for the project time, the newest version of filebeat (1.0.0 -rc1) had a bug when sending data to logstash. It's automatically creating the "@timestamp" field, which also get created by logstash, and makes it fail. We worked with 1.0.0 -beta4 version (the first one) of filebeat.
    2. configuration file
      1. Take a look at the Logstash as output scope.
      2.  We want to make filebeat trust logstash server. If you signed the certificate that logstash is using with some CA, you can give filebeat  a certificate that is signed by that CA. Configuration - "certificate_authorities"
      3. If you didn't use a sign certificate for the logstash server, you can use the certificate and the public key of logstash server (I'll explain later how to create them) as demonstrated in the configuration file
      4. Keep in mind that we don't use java keystores for that filebeat - logstash connection.
    3. Starting command:   filebeat_location/filebeat -v  -c filebeat_location/filebeat.yml
  3. Logstash (Https communication to Elasticsearch)
    1. Download and unpack
    2. generate certificate:
      1. openssl req -new -text -out server.req
      2. ###When asking for common name, enter the server's ip   : --Common Name (eg, your name or your server's hostname) []:10.184.2.232
      3. openssl rsa -in privkey.pem -out server.key
      4.  rm privkey.pem
      5. sign the req file with the CA certificate
    3. Logstash configuration
      1. The input is configured to get data from filebeat
      2. In the Filer and groke scope:
        1.  we are creating the json documents out of the "message" field that we get from filebeat.
        2. duplicating our "msg" field. We call it  msg_tokenized - that's important for Elasticsearch later on. From one hand, we want to search the logs so we save it as an analyzed field ("msg") but we also want to visualize the logs so we also save the "msg" as an unanalysed field (the whole messages). I will explain later how to set an Elasticsearch template for that. 
        3. creating a new field with the file_name of the source log file
        4. removing the "logtime" field
        5. removing the "message" field
      3. Output scope:
        1.  S3 and Elasticsearch outputs
        2. For ssl between Logstash and Elasticsearch, you can use the CA name in the cacert configuration under elasticsearch output.
        3. If we specify a new name for our index in the logstash elasticsearch output, the index would automatically get created.  We decided to create a new index for each log file and then we created an abstraction layer for all application logs with Kibana with queries on top of couple of indices.
        4. The s3 bucket path is not dynamic currently
    4. Elasticseach (open for  HTTPS only)
      1. Download Elasticsearch, and Shield (shieldlicense)
      2. install the 2 addons for shield:
        1. bin/plugin install file:///path/to/file/license-2.0.0.zip
        2. bin/plugin install file:///path/to/file/shield-2.0.0.zip
      3. Create Certificate for elasticsearch
        1. openssl req -new -text -out server.req
        2. openssl rsa -in privkey.pem -out server.key
        3. rm privkey.pem
        4. sign the req file (organisation ca sign)  and save the signed certificate as signed.cer
        5. Create a java keystore  with the sign ertificate and the private key (2 steps)
          1. openssl pkcs12 -export -name myservercert  -in signed.cer -inkey server.key -out keystore.p12
          2. keytool -importkeystore -destkeystore node01.jks -srckeystore keystore.p12 -srcstoretype pkcs12 -alias myservercert
        6. If you are not using a signing CA you can use that command in roder to create the keystore:      keytool -genkey -alias node01_s -keystore node01.jks -keyalg RSA -keysize 2048 -validity 712 -ext san=ip:10.184.2.238
      4. Configuration File
        1. Take a look at the security shield options. We are using the local java keystore that holds our signed certificate and the private key
      5. Start Elasticsearch: /apps/elasticsearch-2.0.0/bin/elasticsearch
      6. Add user:  elasticsearch/bin/esusers useradd alog -p alog123 -r admin
      7. Add Template for elasticsearch - We want all the indexes that are created by logstash to contain both "msg" field for search and "msg" field for visualization. The template would help us get one analyse field and one that is not.
        1. curl -XPUT   https://alog:alog123@elastic-server:9200/_template/template_1 -d '
          {
              "template" : "alog-*",
              "mappings" : {
                  "_default_" : {
                                          "properties":  {
                                           "msg":{"type":"string", "index" : "not_analyzed"}}
                  }
              }
          }'
        2.  This template will catch for all indices that start with "alog" in their name.
    5. Kibana (HTTPS communication to Elasticsearch,  users logon via HTTPS  )
        1. Download and unpack
        2. Start command: bin/kibana
        3. Configuration
          1. In order to enable https connection of users we create another pair of certificate and key with openssl tool and set the certificate and key in the configuration
          2. We set the  elasticsearch.username and password for Shield authentication
          3. We didn't supply Kibana the Elasticsearch certificate and key becuase we used a signed certificate

And that's it :)
I hope that all of the above would help you go through some of the obstacles that we encountered, whether with the SSL setup, with Groke command or with anything else.

Good Luck !

Sunday, October 18, 2015

Amazon Quick Insight

Amazon QuickSight - a new BI platform from amazon (and some related stuff that jumped to my mind while reading about it)


It was last year that one of the hottest trend for BI was Elasticsearch + Kibana.
A great Database with a great visualization tool on top of it. Pretty close to what Splunk does.

You could hear everywhere things like "you just throw inside all of your data and can query and visualize it in any way you wish".
Managers went after that as they were hypnotized. It was revolutionary. 

But it didn't last long. People started to get that it is cool and nice to get your data sliced and diced so fast and in such beautiful graphs, but in many cases it was only nice to have.

Amazon announced QuickSight last month - https://aws.amazon.com/quicksight/

For me it is a super cheap, easy and elegant way to implement that model of Elasticsearch + Kibana, in case that you are already on AWS.

I don't see that as a revolution, but as another classic case of platform that gets migrated to the cloud, which is quite a lot and enabler for businesses that can't develop these kind of platforms by themselves.

I also have problem with that name. Insights are result of data model, relations, and semantics that you give your data. You can throw the best ingredients to the best cooking pot and boil it, but it won't get tasty by itself. Some ingredients require special treatment, some are only go with others, some are not for boiling at all, but for baking. 

In general, there is no one solution for BI, and that's why platforms as Hadoop and S3 are so important for business. They are actually a supermarket of data, as my friend Evgeni says. Data analysts can then become chefs and cook their own desired and tasty meal, and Amazon quick insight just another recipe.

Still, I believe that customers that are considering moving to AWS, would get a lot from quick sight. It is one of the worthwhile amazon services to get familiar with because it is more than just a db. It is a while solutions that customers might really really like. 

And correct me if I am wrong :)

Wednesday, July 1, 2015

Playing with Apache Nifi

I decided to play a little with apache NiFi.

What is NiFi?

I had some problems with a Flume agent and someone wrote that i should check out NiFi.

I can't tell that it replaces Flume. Neither replacing any other tool. 
According to the official website, it is an easy to use, powerful, and reliable system to process and distribute data.

Well, too general for me. You can read more here.

For me, it is kind of a very configurable streaming engine, with some basic common built in features, together with nice graphical interface that is very helpful for building and monitoring the data process.


From a first look these guys have thought of almost everything - Events Merger before writing to HDFS, success/failure routes, It is easy to build new custom java processors, data provenance interface, an ability to store event on disk for recoverability and have many more configurations and features.

That's how it looks like:

Updated Merged Flow




In my case, I only had to replace 1 flume agent for a specific data source. Flume is probably the best and simplest tool to ingest data into the HDFS, but it might have some difficulties in some cases. One of them is when ingesting large events (more than 100M).

So, I needed to get NiFi getting data from an IBM MQ. It only supports ActiveMq right now so I had to build a new IBMMQ processor.

With a good instructions of developing custom processors (here) I was managed to add my own GetDataFromIbmMq processor (which is much simpler to use than the Flume to JMS source implementation that requires a binding file).  (I hope to upload the source code as soon as possible).

The new data stream is actually working very well. Large events  are coming out of the IbmMq processor, get merged, get transformed to sequence file and get written to hdfs. In addition it is very easy now to send the data anywhere else, or playing with the topology in any way we wish (adding more data sources, more etl processes and more data stores to save the data in). It is also fantastic that we are able to see how many events went through any processor, how many succeeded and how many failed.


The trend for us right now is storing first on hdfs, and it is kind of opposit to NiFi that focuses on stream processing. But still, even for simple use case of getting data, compression and storing, it is very easy to use and enable new capabilities of data monitoring and provenance.


Must read posts regarding NiFi:
https://blogs.apache.org/nifi/entry/basic_dataflow_design 

https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark


Saturday, June 6, 2015

Oozie Hive2 step from Hue's Oozie's workflow editor


Here is a warm recommendation for moving to hive2 step when running Hive from Oozie and a way to that even when using Hue workflows editor.



The developer wanted to schedule a complex hive query on our cdh 5.3.3 cluster (join over literal views over json serde table ) with oozie Hive step. The query worked when running from regular hive client. The step failed on Oozie due to log4j permissions exception, before it started the hive query mapreduce. Not so indicative.


After 2 days of debugging (Other queries have worked) we decided to try the Hive 2 step (It happened before that the basic hive step didn't work while Hive2 step did work).
I remember that when upgrading to cdh 5.2, Cloudera wrote that it is recommended to migrate to hive2 step (the link), but didn't write anything about specific functionality that won't work with regular hive step.

We wrote a testing workflow xml with hive2 step, ran it with Oozie CLI , and it worked! That was great and made sense because hive2 is acting just as a regular hive client that connects the hive server. That's the natural thing to do, and I don't understand exactly how the basic hive step is working.

Unfortunately, The Hue's Oozie workflows editor doesn't support Hive2 step. That's why manny people probably aren't familiar with this step.

That's too bad cause we didn't want to force the developer writing and maintaining the Oozie workflow xml without having a convenience GUI or API  (who wants to edit xml files?)   (There is an api that someone from my organization had built but it supports only FS step, pig step and hive step  pyoozie ).

The last resort was using a generic step from Hue. We copied the hive2 step block from the XML to the generic step text box on the workflow editor on hue, and it worked! Victory :)

So remember to prefer Hive2 step with Generic step rather than the classic Oozie Hive step that is full of bugs and doesn't work right (not via hive-server).  In addition, you can try the pyoozie, that make it easier to create Oozie workflows from code.









Thursday, June 4, 2015

Just for fun - my new Surprising alarm clock app

I can't think of better place than my own blog to officially announce my new android application - a Surprising Alarm Clock.



It won't take humanity to a better place, but it is fun and i love it :)

The idea is to take a regular alarm clock and add a surprise feature. By surprise i mean choosing x surprise minutes so that the alarm would wake up somewhere between x minutes before the alarm and the alarm itself.

It is good for people who don't have an exact time to wake up at, and find it amusing to find out what is the actual time that the alarm picked.

That's the link to the Google play store



I believe that the feature would defiantly become a built in feature in any alarm clock, just like the Snooze button. In that case, i should start writing the SDK for alarm clock surprise :)

Tuesday, May 26, 2015

Parquet Multiple Schema Output Format

From time to time (too many times) i am getting surprised by an open sourced development that lacks some basic features. I am getting even more surprised by the number of Google returned results (small one :) ) that correspond to that missing feature.

One example is the Parquet multiple output format.

Apache Parquet logo

If you have more than 1 parquet schema to write within the same mapreduce, it is impossible to do that with the current combination of Multiple output format and the Parquet output format.

How come that no one has ever needed to do that? We have found only 1 Google result of someone who asks about that and no one answered.

The positive side is that it makes our job relevant and interesting. We, the engineers, can proudly tell stories about how we opened the code and implemented our own classes, contributing our part to the community. It is much more interesting than using ready to use products and integrating stuff.

Now back to the parquet thing. The reason that you can't write different schemes within the same job is that the schema is being set in the Main class of the mapreduce. All the classes that are taking part of the parquet records writing are using the first schema that was set on the initiating of the Output Format.

You have to find a way to override the schema from the Reducer, according to the current record schema.



In order to do that, there are 3 places that you should take care of.

1+2. on the ParquetOutputFormat.java class

It is being called from the reducer in order to get the RecordWrtier. We got the schema out of the path, cause we had configuration with schema to path mapping

We replaced the schema in the blue parts:


 public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec)  
     throws IOException, InterruptedException {  
   final WriteSupport<T> writeSupport = getWriteSupport(conf);  
   CodecFactory codecFactory = new CodecFactory(conf);  
   long blockSize = getLongBlockSize(conf);  
   if (INFO) LOG.info("Parquet block size to " + blockSize);  
   int pageSize = getPageSize(conf);  
   if (INFO) LOG.info("Parquet page size to " + pageSize);  
   int dictionaryPageSize = getDictionaryPageSize(conf);  
   if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);  
   boolean enableDictionary = getEnableDictionary(conf);  
   if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));  
   boolean validating = getValidation(conf);  
   if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));  
   WriterVersion writerVersion = getWriterVersion(conf);  
   if (INFO) LOG.info("Writer version is: " + writerVersion);  
   WriteContext init = writeSupport.init(conf);  
   ParquetFileWriter w = new ParquetFileWriter(conf, init.getSchema() our schema, file);  
   w.start();  
   float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,  
     MemoryManager.DEFAULT_MEMORY_POOL_RATIO);  
   long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,  
     MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);  
   if (memoryManager == null) {  
    memoryManager = new MemoryManager(maxLoad, minAllocation);  
   } else if (memoryManager.getMemoryPoolRatio() != maxLoad) {  
    LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +  
      "be reset by the new value: " + maxLoad);  
   }  
   return new ParquetRecordWriter<T>(  
     w,  
     writeSupport,  
     init.getSchema()  our schema,  
     init.getExtraMetaData(),  
     blockSize, pageSize,  
     codecFactory.getCompressor(codec, pageSize),  
     dictionaryPageSize,  
     enableDictionary,  
     validating,  
     writerVersion,  
     memoryManager);  
  }  


3. on GroupWriter class.  that is being called from the groupWriteSupport instance.

  public void write(Group group) {  
   recordConsumer.startMessage();  override schema with group.
   writeGroup(group, schema  group.getType());  
   recordConsumer.endMessage();  
  } 


Thats it.

Thanks to Nimrod Parasol and Kiril Yershov 
for  the implementation.  

Friday, May 22, 2015

Migrating to Yarn On a Mapreduce and Impala Cluster

Hey

I would like to share with you some insights from the yarn 2.5 migration we have gone through.
We are using cdh 5.2.1 with Impala and Mapreduce as the main workloads on the cluster.
Don't take it as a technical guide but as a technical experience with some interesting points we share.

  • Let The Dogs Out:  The cluster is a multi-tenant cluster that is used for Mapreduce and  Cloudera Impala. Before migrating to yarn everything worked great: we  were allocating static resources to Impala (X g RAM on a daemon, no  limit for cpu) and y and z mappers and reducers for each Mapreduce  tasktracker while setting max java heap size for mappers and reducers.We  had enough memory for both Impala and Mapreduce jobs and no problem  with the cpu eaither: We set 14 mappers and 14 reducers per 32 cores  node. The map and reduce slots are always full so it didn't leave much  cpu for the Impala process, but The good thing is that the Impala didn't  really care and it always had enough cpu. The charts showed that the  Impala uses 1/10 cpu in compare to the Mapreduce jobs.  We said that  moving to yarn is like letting the dogs out - and we don't know whether   they will  bite each other or play with each other.

  •  The Dogs Bite Each Other:  After migrating there were X available VCORES for all applications  (Impala and Mapreduce) on the cluster. The mapreduce jobs behaved as  expected and asked for 1 VCORE per mapper/reducer. The problem was that  the Impala asked for lots of VCORES - much more than we expected and  much higher than the Minimum configuration of the fair scheduled we set  for users. Simple query on a parquet 100K rows table with stats, asked  for X/2 VCORES (half of the cluster capacity). We have around 2 queries  per second (requires huge amount of vcores). It all resulted in a 50 %  Impala failures because of 5 minutes time-out of waiting for resources.  In the other hand, important Mapreduce jobs didn't get lots of vcores  and spent lot of time waiting for resources. We saw that users that are  running important mapreduce jobs gets 1/10 vcores than a simple Impala  user that decided to run some queries. That is an undesirable situation  that YARN brought.

  •  An Impala CPU is not a Mapreduce CPU:   Why doe's Impala ask for so many vcores? Cloudera is talking about lack  of table statistics. In addition, according to Nimrod Parasol Impala opens 1 thread per disk. We have 360 disks on cluster so every query was asking for 360 vcores from yarn, and it is not acceptable in a total of 720 vcores cluster. You can take this as an immature implementation of LLAMA - the yarn and impala mediator. We see  that when all the cpu slots is used by mappers, the load average is  extremely high. But when they are used by impala queries, the load  average is quite low. The conclusion is that impala is using the cpu in a  lighter way than the mapreduce. So why should we allocate the same  vcores to both mapreduce and impala, and let the mapreduce jobs wait for  a cpu that is being used by Impala, but could have serve each other at  the same time.

  •  Solutions?  Lets say that we have 1000 vcores on cluster. We were willing to set  1,000 vcores for mapreduce, because setting it higher would result in a  high load average on servers (The mappers are usually cpu intensive). We  were also willing to set 10,000 vcores for the impala because we know  that when it asks for 100 vcores, it is probably physically using much  less cpus. That allocation would create a situation where we give  virtual cpus to impala, but it's ok cause we see that before yarn we  gave all the cpus to Mapreduce, and the Impala did great. The problem is  that yarn and llama won't allow us setting hierarchic queues - upper  limits for mapreduce and impala, and more allocations for applicative  users inside each pool (In order to use hierarchic queues, each user have to set it manually. eg.  set "mapreduceQueue.userA") .

  • Eventually, we gave up on managing Impala resource with llama and yarn, and stayed with Mapreduce 2 (YARN) by itself.


2 More  things you should be aware of:

  • High  Load Average on cluster: another thing that we saw is that after moving  to mapredcue 2, the servers load average got higher by almost 100% (250  with yarn, 150 without yarn). We think that the reason is that more  mappers are  running simultaneously (because we only set number of  containers per tasktracker, but not number of mappers and reducers ), in  compare to when we set y mappers and z reducers per servers, and  mappers are usually heavier than reducers.

  • Hive  on oozie issue: After migrating to yarn, hive steps in oozie workflows  failed with "hive-site.xml (permission denied)". The message has lots of  references, but nothing that is related to yarn. Eventually the  solution was to create a new share lib that contains yarn jars,  cancelling the oozie share lib mapping parameter (that points to a  custom share lib directory ), and using the default shar elib path: The  latest /user/oozie/share/lib/libyyymmdd directory.
Bottom  line:  I don't find Impala mature enough to work with mapreduce under  YARN. A better scheduler and queues options (hierarchic pools), or a  better LLAMA implementation (So impala would ask for its real vcores  need) is required.

Good luck everybody.