## This notebook is meant for you to play around with!
## Try investigating `.dask` and other aspects of the taskgraphs below!

In [None]:
import dask
from dask.threaded import get
from dask.local import get_sync
from dask.optimization import cull, inline, inline_functions, fuse

def print_and_return(string):
    print(string)
    return string

def format_str(count, val, nwords):
    return (f'word list has {count} occurrences of '
            f'{val}, out of {nwords} words')

dsk = {'words': 'apple orange apple pear orange pear pear',
       'nwords': (len, (str.split, 'words')),
       'val1': 'orange',
       'val2': 'apple',
       'val3': 'pear',
       'count1': (str.count, 'words', 'val1'),
       'count2': (str.count, 'words', 'val2'),
       'count3': (str.count, 'words', 'val3'),
       'format1': (format_str, 'count1', 'val1', 'nwords'),
       'format2': (format_str, 'count2', 'val2', 'nwords'),
       'format3': (format_str, 'count3', 'val3', 'nwords'),
       'print1': (print_and_return, 'format1'),
       'print2': (print_and_return, 'format2'),
       'print3': (print_and_return, 'format3')}

In [None]:
dask.base.visualize_dsk(dsk, verbose=True)

In [None]:
outputs = ['print1', 'print2']
dsk1, dependencies = cull(dsk, outputs)  # remove unnecessary tasks from the graph

results = get_sync(dsk1, outputs)

dask.base.visualize_dsk(dsk1, verbose=True)

In [None]:
dsk2 = inline(dsk1, dependencies=dependencies)
results = get_sync(dsk2, outputs)

dask.base.visualize_dsk(dsk2, verbose=True)

In [None]:
dsk3 = inline_functions(dsk2, outputs, [len, str.split], dependencies=dependencies)
results = get_sync(dsk3, outputs)

dask.base.visualize_dsk(dsk3, verbose=True)

In [None]:
dsk4, dependencies = fuse(dsk3)
results = get_sync(dsk4, outputs)

dask.base.visualize_dsk(dsk4, verbose=True)

In [None]:
def optimize(dsk, keys):
    dsk1, deps = cull(dsk, keys)
    dsk2 = inline(dsk1, dependencies=deps)
    dsk3 = inline_functions(dsk2, keys, [len, str.split],
                            dependencies=deps)
    dsk4, deps = fuse(dsk3)
    return dsk4, deps

def optimize_and_get(dsk, keys):    
    dsk4, deps = fuse(dsk, keys)
    return get(dsk4, keys)

optimize_and_get(dsk, outputs)

In [None]:
dask.base.visualize_dsk(dsk, verbose=True, color="order")

In [None]:
dsk5, _ = optimize(dsk, outputs)

dask.base.visualize_dsk(dsk5, verbose=True, color="order")

In [None]:
import time

import awkward as ak
import hist
import matplotlib.pyplot as plt
import numpy as np
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea import processor

import dask
import dask_awkward as dak
import hist.dask as hda

# The opendata files are non-standard NanoAOD, so some optional data columns are missing
NanoAODSchema.warn_missing_crossrefs = False

# The warning emitted below indicates steps_per_file is for initial data exploration
# and test. When running at scale there are better ways to specify processing chunks
# of files.
events, report = NanoEventsFactory.from_root(
    {"https://github.com/CoffeaTeam/coffea/raw/master/tests/samples/nano_dy.root": "Events"},
    metadata={"dataset": "Test"},
    uproot_options={"allow_read_errors_with_report": True},
).events()

# Query 1

Plot the <i>E</i><sub>T</sub><sup>miss</sup> of all events.

In [None]:
q1_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
    .Double()
    .fill(events.MET.pt)
)

dask.compute(q1_hist, report)[0].plot1d(flow="none")
dak.necessary_columns(q1_hist)

# Query 2
Plot the <i>p</i><sub>T</sub> of all jets.

In [None]:
q2_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="ptj", label="Jet $p_{T}$ [GeV]")
    .Double()
    .fill(ak.flatten(events.Jet.pt))
)


q2_hist.compute().plot1d(flow="none")
dak.necessary_columns(q2_hist)

# Query 3
Plot the <i>p</i><sub>T</sub> of jets with |<i>η</i>| < 1.

In [None]:
q3_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="ptj", label="Jet $p_{T}$ [GeV]")
    .Double()
    .fill(ak.flatten(events.Jet[abs(events.Jet.eta) < 1].pt))
)

q3_hist.compute().plot1d(flow="none")
dak.necessary_columns(q3_hist)

# Query 4
Plot the <i>E</i><sub>T</sub><sup>miss</sup> of events that have at least two jets with <i>p</i><sub>T</sub> > 40 GeV.

In [None]:
has2jets = ak.sum(events.Jet.pt > 40, axis=1) >= 2
q4_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
    .Double()
    .fill(events[has2jets].MET.pt)
)

q4_hist.compute().plot1d(flow="none")
dak.necessary_columns(q4_hist)

# Query 5
Plot the <i>E</i><sub>T</sub><sup>miss</sup> of events that have an
opposite-charge muon pair with an invariant mass between 60 and 120 GeV.

In [None]:
mupair = ak.combinations(events.Muon, 2, fields=["mu1", "mu2"])
pairmass = (mupair.mu1 + mupair.mu2).mass
goodevent = ak.any(
    (pairmass > 60)
    & (pairmass < 120)
    & (mupair.mu1.charge == -mupair.mu2.charge),
    axis=1,
)
q5_hist = (
    hda.Hist.new.Reg(100, 0, 200, name="met", label="$E_{T}^{miss}$ [GeV]")
    .Double()
    .fill(events[goodevent].MET.pt)
)


q5_hist.compute().plot1d(flow="none")
dak.necessary_columns(q5_hist)

# Query 6
For events with at least three jets, plot the <i>p</i><sub>T</sub> of the trijet four-momentum that has the invariant mass closest to 172.5 GeV in each event and plot the maximum <i>b</i>-tagging discriminant value among the jets in this trijet.

In [None]:
jets = ak.zip(
    {k: getattr(events.Jet, k) for k in ["x", "y", "z", "t", "btag"]},
    with_name="LorentzVector",
    behavior=events.Jet.behavior,
)
trijet = ak.combinations(jets, 3, fields=["j1", "j2", "j3"])
trijet["p4"] = trijet.j1 + trijet.j2 + trijet.j3
trijet = ak.flatten(
    trijet[ak.singletons(ak.argmin(abs(trijet.p4.mass - 172.5), axis=1))]
)
maxBtag = np.maximum(
    trijet.j1.btag,
    np.maximum(
        trijet.j2.btag,
        trijet.j3.btag,
    ),
)
q6_hists = {
    "trijetpt": hda.Hist.new.Reg(
        100, 0, 200, name="pt3j", label="Trijet $p_{T}$ [GeV]"
    )
    .Double()
    .fill(trijet.p4.pt),
    "maxbtag": hda.Hist.new.Reg(
        100, 0, 1, name="btag", label="Max jet b-tag score"
    )
    .Double()
    .fill(maxBtag),
}

out = dask.compute(q6_hists, report)[0]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
out["trijetpt"].plot1d(ax=ax1, flow="none")
out["maxbtag"].plot1d(ax=ax2, flow="none")
dak.necessary_columns(q6_hists)

# Query 7
Plot the scalar sum in each event of the <i>p</i><sub>T</sub> of jets with <i>p</i><sub>T</sub> > 30 GeV that are not within 0.4 in Δ<i>R</i> of any light lepton with <i>p</i><sub>T</sub> > 10 GeV.

In [None]:
cleanjets = events.Jet[
    ak.all(
        events.Jet.metric_table(events.Muon[events.Muon.pt > 10]) >= 0.4, axis=2
    )
    & ak.all(
        events.Jet.metric_table(events.Electron[events.Electron.pt > 10]) >= 0.4,
        axis=2,
    )
    & (events.Jet.pt > 30)
]
q7_hist = (
    hda.Hist.new.Reg(
        100, 0, 200, name="sumjetpt", label="Jet $\sum p_{T}$ [GeV]"
    )
    .Double()
    .fill(ak.sum(cleanjets.pt, axis=1))
)

q7_hist.compute().plot1d(flow="none")
dak.necessary_columns(q7_hist)

# Query 8
For events with at least three light leptons and a same-flavor opposite-charge light lepton pair, find such a pair that has the invariant mass closest to 91.2 GeV in each event and plot the transverse mass of the system consisting of the missing tranverse momentum and the highest-<i>p</i><sub>T</sub> light lepton not in this pair.

In [None]:
events["Electron", "pdgId"] = -11 * events.Electron.charge
events["Muon", "pdgId"] = -13 * events.Muon.charge
events["leptons"] = ak.concatenate(
    [events.Electron, events.Muon],
    axis=1,
)
events = events[ak.num(events.leptons) >= 3]
pair = ak.argcombinations(events.leptons, 2, fields=["l1", "l2"])
pair = pair[(events.leptons[pair.l1].pdgId == -events.leptons[pair.l2].pdgId)]

pair = pair[
    ak.singletons(
        ak.argmin(
            abs(
                (events.leptons[pair.l1] + events.leptons[pair.l2]).mass
                - 91.2
            ),
            axis=1,
        )
    )
]

events = events[ak.num(pair) > 0]
pair = pair[ak.num(pair) > 0][:, 0]

l3 = ak.local_index(events.leptons)
l3 = l3[(l3 != pair.l1) & (l3 != pair.l2)]
l3 = l3[ak.argmax(events.leptons[l3].pt, axis=1, keepdims=True)]
l3 = events.leptons[l3][:, 0]

mt = np.sqrt(2 * l3.pt * events.MET.pt * (1 - np.cos(events.MET.delta_phi(l3))))
q8_hist = (
    hda.Hist.new.Reg(
        100, 0, 200, name="mt", label="$\ell$-MET transverse mass [GeV]"
    )
    .Double()
    .fill(mt)
)

q8_hist.compute().plot1d(flow="none")
dak.necessary_columns(q8_hist)