Notes on 1010 in Shakti[0]
==========================
these notes form a rough  a very rough  guide to the Shakti 1010 model.
0. data

the data is assumed to have a specific structure:
/ .. <_>
each filei object is a dictionary of the form
<`table1>..<`tablen>!(table1;..;tablen)
tables are segmented by date. for example:
tq/2018.05.01
is a dictionary of two tables t and q:
!t
`s`t`e`c`z`p
!q
`s`t`e`b`bz`a`az`m
this convention is significant in selection and tabulation operations.
in the example, we build four days of trade and quote data from a pair of csv files.
\cd tq
t:+`t`e`s`c`z`p !("scncii";",")0:"../csv/t.csv" /trades
q:+`t`e`s`b`bz`a`az`m!("scniiiic";",")0:"../csv/q.csv" /quotes
{($*x)1:`t`q!x,/:/:(t;q)}'+`d!2018.05.01+!4;
\cd ..
loading the directory of n files creates n subprocesses ("workers") each of which maps the corresponding dictionary of trades and quotes.
load"tq/"
.m.M
(+(,`d)!,2018.05.01 2018.05.02 2018.05.03 2018.05.04)!3 4 5 6
.m.M is a map from a table of the dates in tq/ to a vector of handles, each of which connects to the corresponding worker.
it is possible to load tables directly into the root process ("master"):
load"weather.k"
!+read
`id`value
!+stat
`id`name`state`tz`lat`lon`elev`ant
1. ops

the Shakti version of the 1010 query language consists of nine "ops":
load create worker maps from files in a directory
base pick a base table from each map
link join a new table to the base table
asof asofjoin a new table to the base table
willbe define a new column in the base table
sel select records in the base table
set set arbitrary state in each worker
get retrieve tables from the workers
tabu mapreduce aggregations on each subtable
.m.M K is a map of the "live" handles. K keeps track of which processes contain tables falling within the range of selections made in the course of query execution.
each of the functions listed above has the structure:
{d X}
where X is a parsetree to be executed in each worker .m.M K.
here are brief explanations of each of the functions.
1a. load

load:{.m.M:[];."\\l ",x}
the load function creates .m.M, a map from the partition column metadata to worker handles.
i provide load as a function in order to define .m.M in the case where we \l a k script.
1b. base

base:{K::!.m.M;d(b;T::x);$[K;(. .m.M){Z::+`d!y}':*.!K;::];}
`d is the partition column and K is an index into .m.M, the live handles.
d is the distribution function, and is explained in detail below.
b initializes the necessary globals in each worker: T, G, W, L, I, and C.
b:{G::W::L::[];I::!#!*C::(!+!x)!T::x}
in each worker, Z is a table of the single partition column value for that worker. e.g. +`d!20180501.
in each worker, I is a vector of indices into the base table whose name (a symbol) is T. note that the columns of T are a subset of what we refer to as "the base table". the base table consists of T together with columns defined as "willbes"  virtual columns  and columns from foreign tables which are linked into the base table by means of the link and asof functions.
C is the column dictionary. for each column x in the base table, C x is a package of information whose interpretation by the function c retrieves the current state of the value of x. the c function is described in the section on columns below.
W, G, and L are caches for the values of willbe columns, g_ function columns, and link index vectors respectively. the l, g, and w functions memoize these values and store them in L, G, and W.
1c. link, asof

link:{d(s(?;z);x;y^`d);}
asof:{d(s(bin;z);x;y^`d);}
link and asof are lazy: given the arguments necessary to compute the link index vector, they call the k function which creates entries for each column in the foreign table. each column entry is paired with the arguments to link/asof, which are distinguished in C by the use of ? and bin (binary search).
1d. willbe

willbe:{d({C[x]:$["g"=*$*y;(y;I);y]};x;y);}
like link and asof, willbe is lazy. we don't actually compute the value of the willbe column at the point of definition, but rather store its definition in C. where x is a willbe with definition y (a parse tree) C x is simply y.
1e. sel

sel:{$[`d~*x;K@:&&/'K x;d({I@:&!p x;W::L::[]};x)];}
sel is "semilazy". we don't alter the structure of the base table (which wouldn't make sense in any case, since not all columns of the base table are actually present). rather, we calculate the booleanvalued expression x and use it to reset I. thus, if I is 2 3 7 9 11 and the result of &!p x is 1 2 then we reset I to 2 3 7 9[1 2].
since all cached values in W and L are now invalid, we must zero them out.
note that sel is where we may reset K, the index into live handles. for example, if the effect of a selection is to restrict subsequent ops to d = 2018.05.02 2018.05.03, then K will be reset to:
+(,`d)!,2018.05.02 2018.05.03
since those workers are correlated exclusively with those dates. in other words, subsequent operations will be distributed to just those workers.
1f. get

get:{$[K;,/;::]d({+u!c'u:$[x~`;!C;x]};x)}
unlike the previous five ops, get does not modify state. rather, it returns a table containing zero or more columns in the base table.
1g. set

set:{d({!(;x;y)};x;y);}
since a query in 1010 is a sequence of ops with sideeffects (some lazy), set can be used to initialize or modify *state* in each worker.
1h. tabu

tabu:{n$[K;r;a][x;y]z]}
like get, tabu returns a result and sets state in the master for subsequent queries. tabu is the most complex of the ops we implement in this model. in d r (reduce) is applied to the result of m (map). buried in the map function (m) is the workercall structure. mapreduce is described in a separate section.
2. column mechanics

expressions are passed to op functions as parsetrees (sel, willbe, link, asof) or dictionaries of parsetrees (tabu).
parsetrees can be derived from k expressions with !:
!"+/ab"
((/;+);(;`a;`b))
parsetrees are preprocessed by the p function:
p:{$[`n=@x;(c;,x);1=#x;x;~#x;();@[x;1_!#x;p]]}
which descends through the parsetree structure seeking column references (e.g. `f). when a column reference is found, p substitutes a call to the c function (p;c):
!"b=1"
(=;`b;1)
p@!"b=1"
(=;({$[`n=@c:C x;(!c)[x]I;`n=@*c;(!*c)[x]l . c;2=#c;(g[x]. c)I;w[x]c]};,`b);1)

the c function can be thought of as a "column dispatcher":
c:{$[`n=@c:C x;(!c)[x]I;`n=@*c;(!*c)[x]l . c;2=#c;(g[x]. c)I;w[x]c]}
in order to understand the logic of c, we first need to understand how column data is represented in C.
we let c = C x.
if x is a physical column in the initial table t, then the result is `t.
if x is a linkedin column from a foreign table u, then the result is (u;(c;d);(f;w)), where c and d are lists of column expressions which link u to the basetable, f is either ? or bin, depending on the linktype (link or asof), and w is a parsetree selection expression to be applied to the foreign table.
if x is a sticky willbe then we call g on x and c and index the result by I.
is x is a willbe, then c is a parsetree which defines x.
c dispatches x (c=C x) accordingly:
physical column :(!c)[x]I
linkedin column :(!*c)[x]l . c
sticky willbe :(g[x]. c)I
willbe :w[x]c
the function l either returns the link index vector cached in L, or computes that vector, creates a memoized entry for it in L, and returns it.
l:{u:{+(`$$!#x)!x};$[x in!L;L x;L[x]:z[0][(u@{$[y;x@&x y;x]}[!x;z 1]'*y);u@(!p@)'y 1]]}
note that it is not necessary to flip the arguments and then use ?/: to compute the link index.
the function w either returns the value of the willbe expression cached in W, or computes that value, creates a memoized entry for it in W, and returns it.
w:{$[x in!W;W x;W[x]:!p y]}
3. map/reduce

the version of mapreduce used in 1010.k can also be found in m.k, where it is called directly with approprate arguments. i will use that version.
our example uses a directory t containing two files, 2018.01.01 and 2018.01.02. each file contains a dictionary of tables; in this case, the dictionary is a singleton with domain t and range = a table of three columns f, g, and h.
we kick things off with:
\l tq/
which creates two workers, each of which maps all the tables in the corresponding dictionary  in this case t  and a table in the master .m.M. .m.M is a table with a single column d:
.m.M
(+(,`d)!,2018.01.01 2018.01.02)!9 10
suppose our table a on worker 1 is
f g h
  
1 10 0
1 11 1
2 12 2
and on worker 2 is
f g h
  
1 20 0
2 21 1
2 22 2
3 23 3
and our query is
tabu[();`f]`a`b`c!((sum;`g);(sum;`h);(`avg;`h))
then our workers will return
f  0 1 2
 +   
1  21f 1f 2
2  12f 2f 1
and
f  0 1 2
 +   
1  20f 0f 1
2  43f 3f 2
3  23f 3f 1
where 0 is sum g, 1 is sum h, and 2 is count h. note that we compute only the elementary functions used across aggregations.
we obtain our final result by sum and averagereduction:
f a b c
   
1 41f 1f 0.3333333
2 55f 5f 1.666667
3 23f 3f 3f
two further facts about mapreduce to bear in mind in the course of the analysis given below:
 there are simple aggregations such as sum, and there are complex aggregations which decompose into simple ones, such as avg, which divides sum by count. thus, we do not reduce averages by averaging, but rather by adding their component sums and dividing by the sum of their component counts.
 if we decompose complex aggregations into elementary ones, and then map these on each worker, we will avoid computing each elementary aggregation more than once. for example, if our tabulation is avg x and sum x, then we will compute sum x just once for both aggregations.
the suite of functions comprising mapreduce consists of m (map) and r (reduce) and a set of reductions represented by U and V:
/ mapreduce
m:{[t;c;b;a],/$[B;D,/:';{x}]0 key'.m.M[D:#[!.m.M;C#c;()]]2:({#[!x]. y};t;((C:`d in*c)_c;(B:`d~*b)_b;a))}
r:{[t;c;b;a]#[#[m[t;c;b]n!k;();$[`a=@b;!b;b,:()];n!V[*:'k],'n];();f[k!n:`$$!#k:?,/v'a]'a:u'a]}
/ reductions
U:(avg;var;dev)!({(%;(sum;x);(count;x))};{({(x%z)y*y%:z};(sum;(*;x;x));(sum;x);(count;x))};{(%:;(var;x))})
u:{$[@x;x;f:U@*x;u'f . 1_x;u'x]}
V!:V:(last;sum;min;max);V,:(#:;?:)!(+/;?,/)
v:{$[@x;();V@*x;,x;,/v'x]}
f:{$[@y;y;n:x y;n;f[x]'y]}
in order to follow the execution of our example, i've rewritten m and r to use tiny steps in the computation, and which also trace the results of each step in the console.
mm:{[t;c;b;a]
trace["mm:"] {[t;c;b;a]}
trace["t:"] t
trace["c:"] c
trace["b:"] b
trace["a:"] a
trace["d:(B:`d~*b)_b"] d:(B:`d~*b)_b
trace["e:(C:`d in*c)_c"] e:(C:`d in*c)_c
trace["f:{#[!x]. y}"] f:{#[!x]. y}
trace["g:(f;t;(e;d;a))"] g:(f;t;(e;d;a))
trace["D:#[!.m.M;C#c;()]"] D:#[!.m.M;C#c;()]
trace["q:.m.M[D]2:g"] q:.m.M[D]2:g
trace["r:0 key'q"] r:0 key'q
trace["h:$[B;D,/:';{x}]"] h:$[B;D,/:';{x}]
trace["s:,/h r"] s:,/h r
s}
rr:{[t;c;b;a]
trace[""] "press enter to advance through the calculation"
trace["rr:"] {[t;c;b;a]}
trace["t:"] t
trace["c:"] c
trace["b:"] b
trace["a:"] a
trace["a:u'a"] a:u'a
trace["k:?,/v'a"] k:?,/v'a
trace["n:`$$!#k"] n:`$$!#k
trace["h:mm[t;c;b]n!k"] h:mm[t;c;b]n!k
trace["d:f[k!n]'a"] d:f[k!n]'a
trace["e:n!V[*:'k],'n"] e:n!V[*:'k],'n
trace["g:$[`a=@b;!b;b,:()]"] g:$[`a=@b;!b;b,:()]
trace["r:#[h;();g;e]"] r:#[h;();g;e]
trace["s:#[r;();d]"] s:#[r;();d]
s}
/ example mapreduce
rr[`t;();`f]`a`b!((sum;`g);(avg;`h))
rr and mm are both called with table t, whereclause w, byclause b, and a dictionary of aggregation parsetrees a:
rr:{[t;c;b;a]}
t:`t
c:()
b:`f
a:

 +  
`a  sum `g
`b  sum `h
`c  avg `h
reduce each aggregation to an expression in elementary operations:
a:u'a
[a:(sum;`g);b:(sum;`h);c:(%;(sum;`h);(#:;`h))]
find the unique elementary operations:
k:?,/v'a
((sum;`g);(sum;`h);(#:;`h))
n is a vector synthetic domainnames:
n:`$$!#k
`0`1`2
call mm with t=t, c=c, b=b, and a=n!k:
mm:{[t;c;b;a]}
t:`t
c:()
b:`f
a:

 +  
`0  sum `g
`1  sum `h
`2  #: `h
d is the segmentation column  it is not a column in t, so it may not appear in the executable byclause or whereclause:
d:(B:`d~*b)_b
,`f
e is the whereclause constraint column. again, we delete constraints referring to d:
e:(C:`d in*c)_c
()
the selection function to be executed in the server:
f:{#[!x]. y}
{#[!x]. y}
g = (selection function;table;whereclause;byclause;aggregations):
g:(f;t;(e;d;a))
({#[!x]. y};`t;(();,`f;[0:(sum;`g);1:(sum;`h);2:(#:;`h)]))
D = dates selected in the whereclause c:
D:#[!.m.M;C#c;()]
d

2018.01.01
2018.01.02
q = selection from each server:
q:.m.M[D]2:g
f  0 1 2
 +   
1  21f 1f 2
2  12f 2f 1
f  0 1 2
 +   
1  20f 0f 1
2  43f 3f 2
3  23f 3f 1
dekey each result:
r:0 key'q
f 0 1 2
   
1 21f 1f 2
2 12f 2f 1
f 0 1 2
   
1 20f 0f 1
2 43f 3f 2
3 23f 3f 1
glue D to each record if d is in the original byclause:
h:$[B;D,/:';{x}]
{x}
raze the dekeyed results  *** return to rr:
s:,/h r
f 0 1 2
   
1 21f 1f 2
2 12f 2f 1
1 20f 0f 1
2 43f 3f 2
3 23f 3f 1
*** back in rr  h is the result s of mm  note that columns containing the unique elementary calculations required for the reduction phase (and that the valuecolumn names are synthetic). we will do the initial aggregation on h:
h:mm[t;c;b]n!k
f 0 1 2
   
1 21f 1f 2
2 12f 2f 1
1 20f 0f 1
2 43f 3f 2
3 23f 3f 1
f (the global function  see above) will map actual columnnames in the aggregationdictionary to synthetic domainnames:
d:f[k!n]'a
[a:`0;b:`1;c:(%;`1;`2)]
e says how to aggregate the elementary operations. in this case, we sum both sums and counts:
e:n!V[*:'k],'n

 +  
`0  sum `0
`1  sum `1
`2  sum `2
normalize the byclause:
g:$[`a=@b;!b;b,:()]
,`f
aggregate the resuls of the elementary operations:
r:#[h;();g;e]
f  0 1 2
 +   
1  41f 1f 3f
2  55f 5f 3f
3  23f 3f 1f
and finally calculate the aggregations themselves (sum, average, &c.):
s:#[r;();d]
f  a b c
 +   
1  41f 1f 0.3333333
2  55f 5f 1.666667
3  23f 3f 3f
Footnotes

[0] this shakti implementation continues the sequence of k3 implementations described here:
http://www.nsl.com/k/k3_1010/1010.txt
the unoptimized single table version for k3 is:
http://www.nsl.com/k/k3_1010/1010_0.k
and the optimizing version is:
http://www.nsl.com/k/k3_1010/1010_1.k
[1] in q (k4) we have
q)a:([]d:10 20 30)
q)b:0 1 2
q)a!b
d 
 
10 0
20 1
30 2
also note that in shakti we recommend a verbal distinction between dictionaries, which are maps from a domain of names, and the more general form whose domain is a list of objects.
[2] in k4:
select a by b from t where c
[3] in k4:
select a from t where c.