1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
//! This is the main module, and contains the definition of the orchestrator
use std::{
    any::TypeId, collections::HashMap, error::Error, future::Future, marker::PhantomData, mem, ops::Deref, pin::Pin, sync::Arc
};

use crate::prelude::*;
use async_trait::async_trait;
use tokio::sync::{Notify, Semaphore};

#[derive(Debug, thiserror::Error)]
/// Possible errors returned from the orchestrator
pub enum OrchestratorError {
    /// It was not possible to found the requested resource
    #[error("NotFound")]
    NotFound,
    /// Failed Authentication
    #[error("Failing Autentication: {0}")]
    FailingAutentication(ExerciseResult),

    /// Execution Error
    #[error("Execution Error: {0}")]
    ExecutionError(#[from] Box<dyn Error + Send + Sync>),
}

/// which result should a complete execution return?
pub type ResultOutput = Result<ExerciseResult, Box<dyn Error>>;
/// wrap ResultOutput in a dynamic Future
pub type ResultFuture = Pin<Box<dyn Send + Sync + Future<Output = ResultOutput>>>;

/// Type of a dynamic Function that returns a ResultFuture. It takes as input an ExerciseDefinition and a String
pub type Func = dyn Send + Sync + Fn(&dyn ExerciseDef, String) -> ResultFuture;

/// What does an exercise generator need to return?
pub type ExerciseGeneratorFuture<S> =
    Pin<Box<dyn Send + Sync + Future<Output = Result<S, Box<dyn Error + Send + Sync + 'static>>>>>;
/// Dyncamic function, it returns an ExerciseGeneratorFuture
pub type ExerciseGenerator<S> = Box<dyn Send + Sync + Fn(String) -> ExerciseGeneratorFuture<S>>;

/// Add user source code to the ExerciseDef
pub type UserSrcAdder<S> = Box<dyn Send + Sync + Fn(S, String) -> ExerciseGeneratorFuture<S>>;

/// Which error should the implementation return?
pub type DynError = Box<dyn Error + Send + Sync>;


/*
Definition: ExecutorState + ExerciseDef + Into<S> + TryFrom<S>,
        DefinitionWithSource: ExecutorState + Into<S>,
        F: Future<Output = Result<Definition, Box<dyn Error + Send + Sync + 'static>>>
            + 'static
            + Send
            + Sync,*/
pub type ExerciseDefinitionFuture = Pin<Box<dyn Send + Sync + Future<Output=Result<Box<dyn ExerciseDef>, Box<dyn Error + Send + Sync + 'static>>>>>;
pub type ExerciseDefinitionFunction = Box<dyn Send + Sync + Fn(String) -> ExerciseDefinitionFuture>;
/// The main struct, it orchestrates all plugins, executors, memory ecc...
pub struct Orchestrator<S: ExecutorGlobalState> {
    ph: PhantomData<S>,
    memory: Box<dyn Memory<S>>,
    /// save all executors
    pub executors: HashMap<(TypeId, TypeId), Executor<S>>,
    /// executor generator saved
    pub exercise_generators: HashMap<TypeId, (ExerciseGenerator<S>, UserSrcAdder<S>)>,

    execise_definition: HashMap<TypeId, ExerciseDefinitionFunction>,

    check_when_add: bool,
    /// saved plugin, runned with run method
    plugins: Vec<Box<dyn InnerPlugin<S>>>,
    /// semaphore to keep track of concurrent exercise execution
    execution_semaphore: Semaphore,
}

impl<S: ExecutorGlobalState> Orchestrator<S> {
    /// Constructor, it takes as input the total number of permits available for execution and a Memory
    pub fn new(execution_permits: usize, check_when_add: bool, memory: Box<dyn Memory<S>>) -> Self {
        Orchestrator {
            ph: PhantomData,
            executors: HashMap::new(),
            exercise_generators: HashMap::new(),
            execise_definition: HashMap::new(),
            check_when_add,
            memory,
            plugins: Vec::new(),
            execution_semaphore: Semaphore::new(execution_permits),
        }
    }
}
impl<S: ExecutorGlobalState> Orchestrator<S> {
    /// process the given exercise (name), and deliver the source (s). it gives back an ExerciseResult if all is gone well
    pub async fn process_exercise(
        &self,
        name: String,
        source: String,
        user: User<Authenticated>,
    ) -> Result<ExerciseResult, DynError> {
        let id = self
            .memory
            .add_submission(name.clone(), source.clone(), user.clone())
            .await?;
        let lock = self.execution_semaphore.acquire().await?;
        let generated = self
            .generate_exercise(name.to_string(), source.to_string())
            .await?;
        let final_state = self.run_state(generated).await?;
        let result: ExerciseResult =
            TryInto::try_into(final_state).map_err(|_| "wrong result returned")?;
        mem::drop(lock);
        self.memory
            .add_exercise_result(id, user.clone(), result.clone())
            .await?;
        Ok(result)
    }

    /// add exercise,
    /// Then tries to do a normal execution, and check if it does indeed return full score
    /// if not returns an error (and obviusly doesn't add it)
    pub async fn add_exercise<ExerciseType: ExerciseDef + ExecutorState>(
        & self,
        name: &str,
        source: &str,
    ) -> Result<(), DynError> {
        let (generator, src_adder) = self
            .exercise_generators
            .get(&TypeId::of::<ExerciseType>())
            .ok_or(OrchestratorError::NotFound)?;
        let exercise_def = generator(source.to_string()).await?;
        if self.check_when_add{
             //test
            
            let exercise_with_solution = src_adder(exercise_def.clone(), source.to_string()).await?;
            let results = self.run_state(exercise_with_solution).await?;
            let results: ExerciseResult = results
                .try_into()
                .map_err(|_| "not found an exercise result")?;

            //check if we get all the points
            let all_ok = results
                .tests
                .values()
                .all(|x| x.compiled == CompilationResult::Built && x.runned == RunResult::Ok);
            if !all_ok {
                Err(format!(
                    "can't get all the points. Returned this result {:?}",
                    results
                ))?
            }
        }
       
        self.memory
            .add_exercise(name.to_string(), exercise_def, source.to_string())
            .await?;
        Ok(())
    }
    ///get and execute plan
    pub async fn run_state(&self, mut cur: S) -> Result<S, DynError> {
        let plan = self.memory.get_execution_plan(&cur).await?;
        for (from, to, data) in plan {
            let func = self
                .executors
                .get(&(from, to))
                .ok_or("executor not registered")?;
            cur = func(cur, data).await?;
        }
        Ok(cur)
    }

    /// adds an exercise generator to the orchestrator
    ///
    /// NB: it does not check if it's correct or not
    /// if a generator is already present it gets overriten

    pub async fn add_exercise_generators<Definition, DefinitionWithSource, F, F2>(
        &mut self,
        exercise_gen: fn(String) -> F,
        source_add: fn(Definition, String) -> F2,
    ) where
        Definition: ExecutorState + ExerciseDef + Into<S> + TryFrom<S>,
        DefinitionWithSource: ExecutorState + Into<S>,
        F: Future<Output = Result<Definition, Box<dyn Error + Send + Sync + 'static>>>
            + 'static
            + Send
            + Sync,
        F2: Future<Output = Result<DefinitionWithSource, Box<dyn Error + Send + Sync + 'static>>>
            + 'static
            + Send
            + Sync,
    {
        // wrap in a generic function
        let e = exercise_gen.clone();
        let exercise_def = move |template: String| {
            let t: ExerciseDefinitionFuture = Box::pin(async move {
                let t = e(template).await?;
                let t: Box<dyn ExerciseDef> = Box::new(t);
                Ok(t)
            });
            t
        };
        self.execise_definition.insert(TypeId::of::<Definition>(), Box::new(exercise_def));
        let exercise_gen = move |template: String| {
            let t: ExerciseGeneratorFuture<S> = Box::pin(async move {
                let ret = exercise_gen(template).await?;
                let ret: S = ret.into();
                Ok::<S, Box<dyn Error + Send + Sync + 'static>>(ret)
            });
            t
        };
        let source_add = move |definition: S, source: String| {
            let t: ExerciseGeneratorFuture<S> = Box::pin(async move {
                let definition = <S as TryInto<Definition>>::try_into(definition)
                    .map_err(|_| "not a valid input")?;
                let ret = source_add(definition, source).await?;
                let ret: S = ret.into();
                Ok::<S, Box<dyn Error + Send + Sync + 'static>>(ret)
            });
            t
        };

        self.exercise_generators.insert(
            TypeId::of::<Definition>(),
            (Box::new(exercise_gen), Box::new(source_add)),
        );
    }
    /// generate an exercise from a name and a source-code
    async fn generate_exercise(&self, name: String, source: String) -> Result<S, DynError> {
        let (ty, template) = self.memory.get_exercise(name).await?;
        let (generator, source_adder) = self.exercise_generators.get(&ty).ok_or("not found")?;
        let generated = generator(template).await?;
        let added = source_adder(generated, source).await?;
        Ok(added)
    }
    pub async fn get_exercise_info(&self, name: String)-> Result<Box<dyn ExerciseDef>, DynError>{
        let (ty, template) = self.memory.get_exercise(name).await?;
        let s  =self.execise_definition.get(&ty).ok_or("not found")?(template).await?;
        Ok(s)
    }


    /// Adds a plugin to the orchestrator
    pub async fn add_plugin<P: Plugin<S> + 'static>(&mut self, mut p: P) -> Result<(), DynError> {
        p.on_add(self).await?;
        let to_push = Box::new(PluginStorage::new(p));
        self.plugins.push(to_push);
        Ok(())
    }

    /// Runs the Orchestrator.
    pub async fn run(mut self) -> OrchestratorReference<S> {
        let mut to_run = Vec::new();
        mem::swap(&mut to_run, &mut self.plugins);
        let o = self.as_ref();
        let n = Arc::new(Notify::new());
        for mut cur in to_run {
            let o = o.clone();
            let n = n.clone();
            let to_run = async move {
                cur.run(o.clone(), n).await.unwrap();
            };
            tokio::spawn(to_run);
        }
        n.notified().await;
        o
    }
    /// get a reference to the internal memory
    pub fn memory(&self) -> &dyn Memory<S> {
        self.memory.as_ref()
    }

    /// Enables a particular executor
    pub async fn enable_executor<
        Input: ExecutorState + TryFrom<S> + Into<S>,
        Output: ExecutorState + Into<S>,
        Data: Serialize,
    >(
        &mut self,
        data: Data,
    ) -> Result<(), DynError> {
        use crate::executor::AddExecutor;
        self.enable_executor_typed(
            &Input::async_default().await,
            &Output::async_default().await,
            data,
        )
        .await?;
        Ok(())
    }
}

#[derive(Clone)]
/// A shared reference to the orchestrator
pub struct OrchestratorReference<S: ExecutorGlobalState> {
    inner: Arc<Orchestrator<S>>,
}
impl<S: ExecutorGlobalState> Deref for OrchestratorReference<S> {
    type Target = Orchestrator<S>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}
impl<S: ExecutorGlobalState> Orchestrator<S> {
    /// returns a reference to the orchestrator
    pub fn as_ref(self) -> OrchestratorReference<S> {
        OrchestratorReference {
            inner: Arc::new(self),
        }
    }
}
#[async_trait]
/// Reference without state
pub trait ReferenceWithoutState: Send + Sync + 'static {
    /// from exercise name, source string, and user authenticated
    async fn process_exercise(
        &self,
        name: String,
        s: String,
        user: User<Authenticated>,
    ) -> Result<ExerciseResult, DynError>;
    /// returns a memory reference (without state)
    fn memory(&self) -> &dyn StatelessMemory;
    //fn deref(&self) -> &Orchestrator<impl ExecutorState>;
}
#[async_trait]
impl<S: ExecutorGlobalState> ReferenceWithoutState for OrchestratorReference<S> {
    /*fn add_plugin<P: Plugin + 'static>(&mut self, p: P) {
        todo!()
    }*/
    fn memory(&self) -> &dyn StatelessMemory {
        self.memory.as_stateless()
    }

    async fn process_exercise(
        &self,
        name: String,
        s: String,
        user: User<Authenticated>,
    ) -> Result<ExerciseResult, DynError> {
        Ok(self.inner.process_exercise(name, s, user).await?)
    }
}

#[cfg(test)]
mod tests {

    use crate as orchestrator;
    use crate::{
        prelude::{Orchestrator, OrchestratorReference},
        GenerateState,
    };
    GenerateState!(ExerciseResult);

    #[test]
    fn test_syncness() {
        fn is_sync<T: Sync>() {}
        fn is_send<T: Send>() {}
        is_sync::<Orchestrator<State>>();
        is_send::<&Orchestrator<State>>();
        is_sync::<&Orchestrator<State>>();
        is_send::<OrchestratorReference<State>>();
        is_sync::<OrchestratorReference<State>>();
    }
}