Pandora.Apache.Avro.IDL.To.Apache.Parquet
0.11.25
See the version list below for details.
dotnet add package Pandora.Apache.Avro.IDL.To.Apache.Parquet --version 0.11.25
NuGet\Install-Package Pandora.Apache.Avro.IDL.To.Apache.Parquet -Version 0.11.25
<PackageReference Include="Pandora.Apache.Avro.IDL.To.Apache.Parquet" Version="0.11.25" />
paket add Pandora.Apache.Avro.IDL.To.Apache.Parquet --version 0.11.25
#r "nuget: Pandora.Apache.Avro.IDL.To.Apache.Parquet, 0.11.25"
// Install Pandora.Apache.Avro.IDL.To.Apache.Parquet as a Cake Addin #addin nuget:?package=Pandora.Apache.Avro.IDL.To.Apache.Parquet&version=0.11.25 // Install Pandora.Apache.Avro.IDL.To.Apache.Parquet as a Cake Tool #tool nuget:?package=Pandora.Apache.Avro.IDL.To.Apache.Parquet&version=0.11.25
Pandora.Apache.Avro.IDL.To.Apache.Parquet
Table of Contents
Background
Currently, when working with Apache Kafka® and Azure
Databricks® (Apache Spark®), there is a
built-in mechanism to transform Apache Avro® data to Apache
Parquet® files. The issue with this approach, if we think in
medallion lakehouse architecture, is that
AVRO
with nested data, will be persisted in a single PARQUET
file in the
bronze layer (full, raw and unprocessed history of each
dataset) relying on ArrayType
,
MapType
and StructType
to represent the nested data. This will make it a bit
more tedious to post-process data respectively in the following layers: silver
(validated and deduplicated data) and
gold (data as knowledge).
Figure 1: Delta lake medallion architecture and data mesh |
To avoid this issue, we present an open-source library, that will help
transform AVRO
, with nested data, to multiple PARQUET
files where each of
the nested data elements will be represented as an extension table (separate
file). This will allow to merge both the bronze and silver layers
(full, raw and history of each dataset combined with defined structure,
enforced schemas as well validated and deduplicated data), to make it easier
for data engineers/scientists and business analysts to combine data with already
known logic (SQL joins
) and tools.
Figure 2: Azure Databricks python notebook and SQL cell |
As two of the medallion layers are being combined to a single, it might lead to the possible saving of a ⅓ in disk usage. Furthermore, since we aren't relying on a naive approach, when flattening and storing data, it could further lead to greater savings and a more sustainable and environmentally friendly approach.
Figure 3: Green Software Foundation with the Linux Foundation to put sustainability at the core of software engineering |
How to use the library
In order to show how to use the library to convert AVRO
nested data to
PARQUET
files, we will rely on some succinct demo script snippets. The fully
working script is available at: ./demo/avroidl2parquet.fsx.
Package dependencies
#r "nuget: Azure.Storage.Files.DataLake, 12.12.01"
#r "nuget: Microsoft.Extensions.Logging, 7.00.00"
#r "nuget: Newtonsoft.Json, 13.00.02"
#r "nuget: Pandora.Apache.Avro.IDL.To.Apache.Parquet, 0.11.21"
// Specify the local Sample DLL file
#I @"../Pandora.Apache.Avro.IDL.To.Apache.Parquet.Samples/bin/Release/net6.0/"
#r @"Pandora.Apache.Avro.IDL.To.Apache.Parquet.Samples.dll"
For this demo script, besides our own package, we will need the following Microsoft packages:
- Azure.Storage.Files.DataLake: To deliver the created
PARQUET
andCONTROL
files to the delta-lake. - Microsoft.Extensions.Logging: Our library needs an instance of an
ILogger
.
Furthermore, we will also need:
- Newtonsoft.Json: This package is needed to parse and pass the
AVRO
schema to transform it into aPARQUET
schema.
And finally, we will be using a local dotnet
project, containing some of the
AVRO IDL
test samples, taken from Apache AVRO on GitHub.
Package imports
Once we have added the packages to our script, we can then import the following
namespaces
:
open Microsoft.Extensions.Logging
open Azure.Storage.Files.DataLake
open Azure.Storage.Files.DataLake.Models
open Newtonsoft.Json
open Newtonsoft.Json.Linq
open Pandora.Apache
open Pandora.Databricks
open Pandora.Utils
open org.apache.avro
open org.apache.avro.test
Generating random AVRO data
In order to generate random AVRO IDL
data, we will rely on the following
module, which will serialize the specific
data-types and deserialize into
generic
types:
[<RequireQualifiedAccess>]
module Test =
open System.Collections.Generic
let private r = new Random()
// local:
// - org.apache.avro.Interop
let private interop () =
let m = new org.apache.avro.MD5 ()
m.Value <- Array.init 16 (fun _ -> 0x30uy)
let s = new Node ()
s.label <- String.Empty
s.children <- [| |]
let n = new Node ()
n.label <- String.Empty
n.children <- [| s |]
let f = new Foo ()
f.label <- "label"
let d = new Dictionary<string,Foo>()
d.["foo"] <- f
let i = new Interop ()
i.stringField <- String.Empty
i.nullField <- null
i.mapField <- d
i.unionField <-
[| 42.0 :> obj
; [| "bytes is a byte sequence"B |] :> obj
; true :> obj
|][r.Next(0,3)]
i.enumField <- Kind.A
i.fixedField <- m
i.recordField <- n
i
|> Avro.Bytes.Specific.serialize
|> Avro.Bytes.Generic.deserialize (i.Schema.ToString())
// local:
// - org.apache.avro.test.TestRecord
let private testRecord () =
let m = new org.apache.avro.test.MD5()
m.Value <- Array.init 16 (fun _ -> 0x30uy)
let t = new TestRecord ()
t.name <- "name"
t.kind <- Kind.BAZ
t.status <- Status.A
t.hash <- m
t.nullableHash <-
[| null
; m
|][r.Next(0,2)]
t.value <- 42.0
t.average <- 42.0f
t.t <-
[| Unchecked.defaultof<TimeSpan>
; TimeSpan.Zero
|][r.Next(0,2)]
t.l <- 42L
t.a <- [| "string array" |]
t.prop <-
[| null
; "foobar"
|][r.Next(0,2)]
t
|> Avro.Bytes.Specific.serialize
|> Avro.Bytes.Generic.deserialize (t.Schema.ToString())
let private cases =
[| interop
; testRecord
|]
let randomEvent () =
let i =
r.Next
( 0
, Array.length cases
)
cases[i] ()
Logger and DataLakeServiceClient
As our library require to pass an ILogger
we can easily create one as:
let logger () =
let lf = new LoggerFactory ()
lf.CreateLogger ()
For the DataLakeServiceClient
we can create the following value (dlsc
),
which can then be used in the rest of the script without having to send it as a
function parameter:
let dlsc =
"AZURE_DATALAKE_ENV_CONN_STR"
|> Environment.GetEnvironmentVariable
|> fun connStr ->
let opts = new DataLakeClientOptions()
opts.Retry.NetworkTimeout <- TimeSpan.FromMinutes 15 (* In case network is lost *)
DataLakeServiceClient
( connectionString = connStr
, options = opts
)
Loop-logic
For the recursive and asynchronous loop
logic, we will pass the created
logger, a cancellation token and the number of data elements to create of a
given AVRO IDL
instance.
We will then create a UTC
date and timestamp as well as its representation as
a date-time offset:
let dts = DateTime.UtcNow
let off = new DateTimeOffset(dts)
Next step is to define the values for the environment
, AST
and PARQUET
tables:
let env = Parquet.Schema.Ast.Environment.empty ()
let ast = Parquet.Schema.Ast.empty ()
let tabs = Parquet.Tables.empty log ast
We create a sequence of AVRO IDL
test events:
Seq.init n (
fun _ ->
Test.randomEvent ()
)
and we then transform them to PARQUET
tables:
…
|> Seq.iteri (
fun i gen ->
let idx = i + 1
let sha =
gen
|> Avro.Bytes.Generic.serialize
|> Hash.SHA256.Bytes.toBytes
let gn = gen.Schema.Name
let gns = gen.Schema.Namespace
let fqdn =
Parquet.Schema.Ast.Fqdn.FQDN
( gn
, Some gns
)
let (env', ast', es) =
if not (ast.ContainsKey fqdn) then
gen.Schema.ToString()
|> JToken.Parse
|> Avro.Schema.toParquetSchema log None env ast
else
( env
, ast
, Seq.empty
)
if Seq.isEmpty es then
let tabs' = Parquet.Tables.update log (Some tabs) ast'
Parquet.Tables.populate
log
off
(Some sha) None None
ast'
gen
gn (Some gns)
tabs'
if 0 = idx % m then
( Date.timestamp 0
, sprintf "%032i" i
)
||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Generated data items: %s"
|> Output.stdout
else
Date.timestamp 0
|> printfn "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | Errors:"
es
|> Seq.iter (
fun e ->
( Date.timestamp 0
, e
)
||> sprintf "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | - %s"
|> Output.stdout
)
)
NOTE: If a given schema is already in the
AST
, we will skip it, as we only parsing once a givenAVRO IDL
schema to aPARQUET
schema.
Once we have generated the PARQUET
tables, we will transform them to bytes
and then store them on the data lake. For this, we will need to define a file
system client:
let fsc =
"AZURE_DATALAKE_DELTA_BLOB"
|> Environment.GetEnvironmentVariable
|> dlsc.GetFileSystemClient
Afterwards, we will iterate over the generated tables, which aren't empty, generate the bytes and then store then in the Azure Tables Storage:
tabs
|> Seq.filter (
fun table -> 0 < table.Value.Count
)
|> Seq.map (
fun table ->
( table
, table.Value
|> Parquet.Tables.toBytes log dts
)
)
|> Seq.iter(
fun (table, parquet) ->
let ppath =
Path.Combine
( "AZURE_DATALAKE_DELTA_PATH"
|> Environment.GetEnvironmentVariable
, table.Key.Replace(".", "/")
, dts.ToString("yyyy-MM-dd")
|> sprintf "pj_pds=%s"
)
(* Submit PARQUET file to Azure Table Storage with enabled Delta Lake *)
let isppath =
new DataLakePathClient
( fileSystemClient = fsc
, path = ppath
)
|> fun dlpc ->
dlpc.ExistsAsync
( cancellationToken = ct
)
|> Async.AwaitTask
|> Async.RunSynchronously
if not isppath.Value then
fsc.CreateDirectoryAsync
( path = ppath
, cancellationToken = ct
)
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
let pdc =
ppath
|> fsc.GetDirectoryClient
let pfc =
parquet.Key
|> pdc.GetFileClient
let _ =
use ms = new MemoryStream(parquet.Value)
pfc.UploadAsync
( content = ms
, overwrite = false
, cancellationToken = ct
)
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
( Date.timestamp 0
, parquet.Key
)
||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Uploaded to the Azure Data Lake: %s"
|> Output.stdout
)
Figure 4: Parquet folder structure on Azure Table Storage |
Delta-control files (optional)
With the code above, we will only add PARQUET
files to the Azure Table
Storage, but if we want to get the benefits of the delta lake, we will need to
provide a JSONL
control for each of the uploaded PARQUET
files. This can be
achieved by modifying the code above like this:
tabs
|> Seq.filter (
fun table -> 0 < table.Value.Count
)
|> Seq.map (
fun table ->
( table
, table.Value
|> Parquet.Tables.toBytes log dts
)
)
|> Seq.iter(
fun (table, parquet) ->
…
(* Submit PARQUET file to Azure Table Storage with enabled Delta Lake *)
…
( Date.timestamp 0
, parquet.Key
)
||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Uploaded to the Azure Data Lake: %s"
|> Output.stdout
(* Submit CONTROL file to Azure Table Storage `_delta_log` folder *)
let cpath =
Path.Combine
( "AZURE_DATALAKE_DELTA_PATH"
|> Environment.GetEnvironmentVariable
, table.Key.Replace(".", "/")
, "_delta_log"
)
let iscpath =
new DataLakePathClient
( fileSystemClient = fsc
, path = cpath
)
|> fun dlpc ->
dlpc.ExistsAsync
( cancellationToken = ct
)
|> Async.AwaitTask
|> Async.RunSynchronously
if not iscpath.Value then
fsc.CreateDirectoryAsync
( path = cpath
, cancellationToken = ct
)
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
control log ct fsc dts parquet schema cpath
)
where we ensure that a _delta_log
folder exists and is populated by our
control
function which takes: the logger, the cancellation token, the
date-timestamp, the parquet filename & bytes key-value pair, the table schema
and the _delta_log
folder path.
The first thing we need to do, is to find the next index to be used in the delta
lake. It's mandatory that the naming of the sequence of control files is uniform
with no gaps. Once we have found the next index in the sequence, we will
generate a JSONL
control file and we will try to upload it. As the Azure Table
Storage relies on optimistic concurrency, other
process might have added the next control file in the sequence. Therefore, we
will catch the provided error (Azure.RequestFailedException
or
System.AggregateException
) and retry with the next index.
try
let idx =
fsc.GetPathsAsync
( path = cpath
, recursive = false
, cancellationToken = ct
)
|> fun ps ->
ps.GetAsyncEnumerator()
|> Seq.unfold(
fun it ->
let next =
it.MoveNextAsync().AsTask()
|> Async.AwaitTask
|> Async.RunSynchronously
if next then
let name = it.Current.Name
let json =
name
|> Path.GetExtension
|> ((=) ".json")
if not json then
( -1
, it
)
|> Some
else
( name
|> Path.GetFileNameWithoutExtension
|> int
, it
)
|> Some
else
None
)
|> Seq.fold max (-1)
|> ((+) 1)
let jsonl =
DeltaLake.JSONL.init
( log )
( dts )
( parquet.Value.LongLength )
( schema.GetDataFields()
|> DeltaLake.JSONL.Schema.init log
)
( parquet.Key )
|> DeltaLake.toBytes log idx cpath
let jdc =
cpath
|> fsc.GetDirectoryClient
let jfc =
jsonl.Key
|> Path.GetFileName
|> jdc.GetFileClient
let _ =
use ms = new MemoryStream(jsonl.Value)
jfc.UploadAsync
( content = ms
, overwrite = false
, cancellationToken = ct
)
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
( Date.timestamp 0
, jsonl.Key
|> Path.GetFileName
)
||> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Uploaded to the Azure Data Lake: %s"
|> Output.stdout
with
| :? System.AggregateException
| :? Azure.RequestFailedException ->
( Date.timestamp 0
, parquet.Key
)
||> sprintf "%s | net.pandora.avroidl2parquet | WARNING | DEMO | Upload retrying Azure Data Lake: %s"
|> Output.stdout
control log ct fsc dts parquet schema cpath
| ex ->
( Date.timestamp 0
, ex
)
||> sprintf "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | Unexpected error:\n%A"
|> failwith
Figure 5: JSONL control files in _delta_log folder on Azure Table Storage |
Main method
We can now bind the loop to a logic function that will help us to shutdown the
script by pressing ENTER
let logic log cts amount =
[ Async.Control.exit cts
; loop log cts.Token amount
]
|> Async.Choice
let _ =
Date.timestamp 0
|> sprintf "%s | net.pandora.avroidl2parquet | STARTED | DEMO"
|> Output.stdout
try
let sample =
fsi.CommandLineArgs
|> Array.skip 1
|> fun xs ->
if 0 < Array.length xs then
xs.[0]
|> int
else
1
let cts = new CancellationTokenSource()
let log = logger ()
(* Interrupt script by pressing ENTER *)
Date.timestamp 0
|> sprintf "%s | net.pandora.avroidl2parquet | VERBOSE | DEMO | Press ENTER to exit"
|> Output.stdout
logic log cts sample
|> Async.RunSynchronously
|> Option.defaultValue ()
Date.timestamp 0
|> sprintf "%s | net.pandora.avroidl2parquet | STOPPED | DEMO"
|> Output.stdout
00
with ex ->
( Date.timestamp 0
, ex
)
||> sprintf "%s | net.pandora.avroidl2parquet | FAILURE | DEMO | Unexpected error:\n%A"
|> Output.stdout
-1
As mentioned above, the fully working script is available at: ./demo/avroidl2parquet.fsx.
Project dependencies
Library
Dependency | Author | License |
---|---|---|
FSharp.Core | Microsoft | MIT License |
Apache.Avro | The Apache Software Foundation | Apache License 2.0 |
Newtonsoft.Json | James Newton-King | MIT License |
Parquet.Net | Ivan G | MIT License |
Samples
Dependency | Author | License |
---|---|---|
Apache.Avro | The Apache Software Foundation | Apache License 2.0 |
Unit Tests
Dependency | Author | License |
---|---|---|
Microsoft.NET.Test.Sdk | Microsoft | MIT License |
coverlet.collector | .NET foundation | MIT License |
xunit | .NET foundation | Apache License 2.0 |
xunit.runner.visualstudio | .NET foundation | Apache License 2.0 |
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net6.0
- Apache.Avro (>= 1.11.1)
- FSharp.Core (>= 6.0.5)
- Microsoft.Extensions.Logging.Abstractions (>= 7.0.0)
- Newtonsoft.Json (>= 13.0.2)
- Parquet.Net (>= 4.2.2)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.11.32 | 261 | 5/10/2023 |
0.11.31 | 221 | 4/17/2023 |
0.11.30 | 268 | 3/21/2023 |
0.11.29 | 266 | 3/14/2023 |
0.11.28 | 272 | 3/6/2023 |
0.11.27 | 276 | 3/6/2023 |
0.11.26 | 288 | 3/6/2023 |
0.11.25 | 274 | 3/4/2023 |
0.11.24 | 288 | 3/4/2023 |
0.11.23 | 262 | 3/4/2023 |
0.11.22 | 263 | 2/24/2023 |
0.11.21 | 292 | 2/16/2023 |
0.11.20 | 283 | 2/16/2023 |
0.11.19 | 281 | 2/15/2023 |
0.11.18 | 289 | 2/15/2023 |
0.11.17 | 286 | 2/15/2023 |
0.11.16 | 272 | 2/15/2023 |
0.11.15 | 292 | 2/14/2023 |
0.11.14 | 286 | 2/14/2023 |
0.11.13 | 288 | 2/14/2023 |
0.11.12 | 295 | 2/14/2023 |
0.11.11 | 288 | 2/14/2023 |
0.11.10 | 277 | 2/14/2023 |
0.11.9 | 278 | 2/14/2023 |
0.11.8 | 280 | 2/14/2023 |
0.11.7 | 294 | 2/14/2023 |
0.11.6 | 301 | 2/13/2023 |
0.11.5 | 305 | 2/13/2023 |
0.11.4 | 317 | 2/8/2023 |
0.11.3 | 307 | 2/8/2023 |
0.11.2 | 312 | 2/6/2023 |
0.11.1 | 315 | 2/6/2023 |
0.11.0 | 332 | 2/3/2023 |